You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:24 UTC
[15/15] flink git commit: [FLINK-6701] Activate strict checkstyle for
flink-yarn
[FLINK-6701] Activate strict checkstyle for flink-yarn
This closes #3990.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77b0fb9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77b0fb9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77b0fb9f
Branch: refs/heads/master
Commit: 77b0fb9fe3656a5ae7e2ca3bbce28cfa5a0e247e
Parents: d313ac7
Author: zentol <ch...@apache.org>
Authored: Wed May 24 15:10:15 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 26 19:17:59 2017 +0200
----------------------------------------------------------------------
.../yarn/AbstractYarnClusterDescriptor.java | 165 +++++++-------
...bstractYarnFlinkApplicationMasterRunner.java | 21 +-
.../flink/yarn/RegisteredYarnWorkerNode.java | 6 +-
.../main/java/org/apache/flink/yarn/Utils.java | 72 +++---
.../flink/yarn/YarnApplicationMasterRunner.java | 58 +++--
.../apache/flink/yarn/YarnClusterClient.java | 71 +++---
.../apache/flink/yarn/YarnClusterClientV2.java | 8 +-
.../flink/yarn/YarnClusterDescriptor.java | 2 +-
.../flink/yarn/YarnClusterDescriptorV2.java | 4 +-
.../org/apache/flink/yarn/YarnConfigKeys.java | 18 +-
.../flink/yarn/YarnContainerInLaunch.java | 3 +-
.../yarn/YarnFlinkApplicationMasterRunner.java | 27 +--
.../flink/yarn/YarnFlinkResourceManager.java | 64 +++---
.../apache/flink/yarn/YarnResourceManager.java | 82 ++++---
.../YarnResourceManagerCallbackHandler.java | 8 +-
.../flink/yarn/YarnTaskExecutorRunner.java | 9 +-
.../flink/yarn/YarnTaskManagerRunner.java | 19 +-
.../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 99 +++++----
.../flink/yarn/cli/FlinkYarnSessionCli.java | 220 +++++++++----------
.../yarn/configuration/YarnConfigOptions.java | 7 +-
.../YarnHighAvailabilityServices.java | 36 +--
.../YarnIntraNonHaMasterServices.java | 16 +-
.../YarnPreConfiguredMasterNonHaServices.java | 12 +-
.../yarn/messages/ContainersAllocated.java | 11 +-
.../flink/yarn/messages/ContainersComplete.java | 12 +-
flink-yarn/src/main/resources/log4j.properties | 1 -
.../apache/flink/yarn/ApplicationClient.scala | 6 +-
.../org/apache/flink/yarn/YarnJobManager.scala | 1 -
.../org/apache/flink/yarn/YarnMessages.scala | 5 +-
.../org/apache/flink/yarn/YarnTaskManager.scala | 2 +-
.../yarn/TestingYarnFlinkResourceManager.java | 6 +-
.../java/org/apache/flink/yarn/UtilsTest.java | 29 ++-
.../yarn/YarnApplicationMasterRunnerTest.java | 15 +-
.../flink/yarn/YarnClusterDescriptorTest.java | 27 ++-
.../YarnIntraNonHaMasterServicesTest.java | 25 ++-
.../YarnPreConfiguredMasterHaServicesTest.java | 29 +--
.../messages/NotifyWhenResourcesRegistered.java | 3 +
.../RequestNumberOfRegisteredResources.java | 5 +-
38 files changed, 611 insertions(+), 593 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b9a4416..2315c70 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -109,6 +110,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
/**
* If the user has specified a different number of slots, we store them here
+ * Files (usually in a distributed file system) used for the YARN session of Flink.
+ * Contains configuration files and jar files.
+ */
+ private Path sessionFilesDir;
+
+ /**
+ * If the user has specified a different number of slots, we store them here.
*/
private int slots = -1;
@@ -128,7 +136,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private String dynamicPropertiesEncoded;
- /** Lazily initialized list of files to ship */
+ /** Lazily initialized list of files to ship. */
protected List<File> shipFiles = new LinkedList<>();
private org.apache.flink.configuration.Configuration flinkConfiguration;
@@ -140,18 +148,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private String zookeeperNamespace;
/** Optional Jar file to include in the system class loader of all application nodes
- * (for per-job submission) */
+ * (for per-job submission). */
private final Set<File> userJarFiles = new HashSet<>();
private YarnConfigOptions.UserJarInclusion userJarInclusion;
public AbstractYarnClusterDescriptor() {
// for unit tests only
- if(System.getenv("IN_TESTS") != null) {
+ if (System.getenv("IN_TESTS") != null) {
try {
conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
} catch (Throwable t) {
- throw new RuntimeException("Error",t);
+ throw new RuntimeException("Error", t);
}
}
@@ -183,17 +191,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
protected abstract Class<?> getApplicationMasterClass();
public void setJobManagerMemory(int memoryMb) {
- if(memoryMb < MIN_JM_MEMORY) {
+ if (memoryMb < MIN_JM_MEMORY) {
throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_JM_MEMORY+ " MB");
+ + "of " + MIN_JM_MEMORY + " MB");
}
this.jobManagerMemoryMb = memoryMb;
}
public void setTaskManagerMemory(int memoryMb) {
- if(memoryMb < MIN_TM_MEMORY) {
+ if (memoryMb < MIN_TM_MEMORY) {
throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_TM_MEMORY+ " MB");
+ + "of " + MIN_TM_MEMORY + " MB");
}
this.taskManagerMemoryMb = memoryMb;
}
@@ -209,7 +217,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
public void setTaskManagerSlots(int slots) {
- if(slots <= 0) {
+ if (slots <= 0) {
throw new IllegalArgumentException("Number of TaskManager slots must be positive");
}
this.slots = slots;
@@ -224,7 +232,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
public void setLocalJarPath(Path localJarPath) {
- if(!localJarPath.toString().endsWith("jar")) {
+ if (!localJarPath.toString().endsWith("jar")) {
throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
}
this.flinkJarPath = localJarPath;
@@ -239,7 +247,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
public void setTaskManagerCount(int tmCount) {
- if(tmCount < 1) {
+ if (tmCount < 1) {
throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
}
this.taskManagerCount = tmCount;
@@ -253,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
for (File shipFile: shipFiles) {
// remove uberjar from ship list (by default everything in the lib/ folder is added to
// the list of files to ship, but we handle the uberjar separately.
- if(!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
+ if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
this.shipFiles.add(shipFile);
}
}
@@ -274,7 +282,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return false;
}
try {
- for(URL jarFile : requiredJarFiles) {
+ for (URL jarFile : requiredJarFiles) {
if (!userJarFiles.contains(new File(jarFile.toURI()))) {
return false;
}
@@ -303,21 +311,20 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return this.dynamicPropertiesEncoded;
}
-
private void isReadyForDeployment() throws YarnDeploymentException {
- if(taskManagerCount <= 0) {
+ if (taskManagerCount <= 0) {
throw new YarnDeploymentException("Taskmanager count must be positive");
}
- if(this.flinkJarPath == null) {
+ if (this.flinkJarPath == null) {
throw new YarnDeploymentException("The Flink jar path is null");
}
- if(this.configurationDirectory == null) {
+ if (this.configurationDirectory == null) {
throw new YarnDeploymentException("Configuration directory not set");
}
- if(this.flinkConfigurationPath == null) {
+ if (this.flinkConfigurationPath == null) {
throw new YarnDeploymentException("Configuration path not set");
}
- if(this.flinkConfiguration == null) {
+ if (this.flinkConfiguration == null) {
throw new YarnDeploymentException("Flink configuration object has not been set");
}
@@ -337,7 +344,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
// check if required Hadoop environment variables are set. If not, warn user
- if(System.getenv("HADOOP_CONF_DIR") == null &&
+ if (System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. " +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
@@ -346,8 +353,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
private static boolean allocateResource(int[] nodeManagers, int toAllocate) {
- for(int i = 0; i < nodeManagers.length; i++) {
- if(nodeManagers[i] >= toAllocate) {
+ for (int i = 0; i < nodeManagers.length; i++) {
+ if (nodeManagers[i] >= toAllocate) {
nodeManagers[i] -= toAllocate;
return true;
}
@@ -372,7 +379,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
/**
- * Gets a Hadoop Yarn client
+ * Gets a Hadoop Yarn client.
* @return Returns a YarnClient which has to be shutdown manually
*/
protected YarnClient getYarnClient() {
@@ -420,7 +427,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
@Override
public YarnClusterClient deploy() {
try {
- if(UserGroupInformation.isSecurityEnabled()) {
+ if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
// so we check only in ticket cache scenario.
@@ -453,7 +460,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
final YarnClient yarnClient = getYarnClient();
-
// ------------------ Check if the specified queue exists --------------------
try {
@@ -477,9 +483,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
} else {
LOG.debug("The YARN cluster does not have any queues configured");
}
- } catch(Throwable e) {
+ } catch (Throwable e) {
LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Error details", e);
}
}
@@ -495,7 +501,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// the yarnMinAllocationMB specifies the smallest possible container allocation size.
// all allocations below this value are automatically set to this value.
final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
- if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
+ if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
@@ -503,10 +509,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
// set the memory to minAllocationMB to do the next checks correctly
- if(jobManagerMemoryMb < yarnMinAllocationMB) {
+ if (jobManagerMemoryMb < yarnMinAllocationMB) {
jobManagerMemoryMb = yarnMinAllocationMB;
}
- if(taskManagerMemoryMb < yarnMinAllocationMB) {
+ if (taskManagerMemoryMb < yarnMinAllocationMB) {
taskManagerMemoryMb = yarnMinAllocationMB;
}
@@ -515,56 +521,56 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
- final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
- if(jobManagerMemoryMb > maxRes.getMemory() ) {
+ final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
+ if (jobManagerMemoryMb > maxRes.getMemory()) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
+ + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
}
- if(taskManagerMemoryMb > maxRes.getMemory() ) {
+ if (taskManagerMemoryMb > maxRes.getMemory()) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
+ + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
}
- final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
+ final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources are currently not available in the cluster. " +
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
- if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+ if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
- + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
+ + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
}
- if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
+ if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+ + "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
}
- if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
+ if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+ + "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
}
// ----------------- check if the requested containers fit into the cluster.
int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
- if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+ if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
- Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
+ Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
}
// allocate TaskManagers
- for(int i = 0; i < taskManagerCount; i++) {
- if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+ for (int i = 0; i < taskManagerCount; i++) {
+ if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available in the YARN cluster. " +
"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
- "the following NodeManagers are available: " + Arrays.toString(nmFree) + NOTE_RSC );
+ "the following NodeManagers are available: " + Arrays.toString(nmFree) + noteRsc);
}
}
@@ -669,7 +675,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// ship list that enables reuse of resources for task manager containers
StringBuilder envShipFileList = new StringBuilder();
- // upload and register ship files
+ // upload and register ship files
List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList);
List<String> userClassPaths;
@@ -752,9 +758,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
Path remoteKrb5Path = null;
Path remoteYarnSiteXmlPath = null;
boolean hasKrb5 = false;
- if(System.getenv("IN_TESTS") != null) {
+ if (System.getenv("IN_TESTS") != null) {
String krb5Config = System.getProperty("java.security.krb5.conf");
- if(krb5Config != null && krb5Config.length() != 0) {
+ if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
@@ -762,7 +768,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
- File f = new File(System.getenv("YARN_CONF_DIR"),Utils.YARN_SITE_FILE_NAME);
+ File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
Path yarnSitePath = new Path(f.getAbsolutePath());
@@ -777,7 +783,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LocalResource keytabResource = null;
Path remotePathKeytab = null;
String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
- if(keytab != null) {
+ if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
keytabResource = Records.newRecord(LocalResource.class);
Path keytabPath = new Path(keytab);
@@ -787,7 +793,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
- if ( UserGroupInformation.isSecurityEnabled() && keytab == null ) {
+ if (UserGroupInformation.isSecurityEnabled() && keytab == null) {
//set tokens only when keytab is not provided
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, conf);
@@ -806,7 +812,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
- appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString() );
+ appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
@@ -818,19 +824,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
- if(keytabResource != null) {
- appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
+ if (keytabResource != null) {
+ appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
- appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
+ appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
}
//To support Yarn Secure Integration Test Scenario
- if(remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+ if (remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
- appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString() );
+ appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
- if(dynamicPropertiesEncoded != null) {
+ if (dynamicPropertiesEncoded != null) {
appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
}
@@ -845,9 +851,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
capability.setVirtualCores(1);
String name;
- if(customName == null) {
+ if (customName == null) {
name = "Flink session with " + taskManagerCount + " TaskManagers";
- if(detached) {
+ if (detached) {
name += " (detached)";
}
} else {
@@ -858,7 +864,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
appContext.setApplicationType("Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
- if(yarnQueue != null) {
+ if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
@@ -874,7 +880,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
- loop: while( true ) {
+ loop: while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
@@ -899,7 +905,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
- if(System.currentTimeMillis() - startTime > 60000) {
+ if (System.currentTimeMillis() - startTime > 60000) {
LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
@@ -922,7 +928,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
return report;
}
-
+
private static List<String> uploadAndRegisterFiles(
Collection<File> shipFiles,
FileSystem fs,
@@ -971,7 +977,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
/**
* Kills YARN application and stops YARN client.
*
- * Use this method to kill the App before it has been properly deployed
+ * <p>Use this method to kill the App before it has been properly deployed
*/
private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) {
LOG.info("Killing YARN application");
@@ -986,11 +992,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
yarnClient.stop();
}
-
private static class ClusterResourceDescription {
- final public int totalFreeMemory;
- final public int containerLimit;
- final public int[] nodeManagersFree;
+ public final int totalFreeMemory;
+ public final int containerLimit;
+ public final int[] nodeManagersFree;
public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
this.totalFreeMemory = totalFreeMemory;
@@ -1006,12 +1011,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
int containerLimit = 0;
int[] nodeManagersFree = new int[nodes.size()];
- for(int i = 0; i < nodes.size(); i++) {
+ for (int i = 0; i < nodes.size(); i++) {
NodeReport rep = nodes.get(i);
- int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+ int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
nodeManagersFree[i] = free;
totalFreeMemory += free;
- if(free > containerLimit) {
+ if (free > containerLimit) {
containerLimit = free;
}
}
@@ -1060,7 +1065,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
public void setName(String name) {
- if(name == null) {
+ if (name == null) {
throw new IllegalArgumentException("The passed name is null");
}
customName = name;
@@ -1098,9 +1103,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
* Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
* supports various methods which, depending on the Hadoop version, may or may not be supported.
*
- * If an unsupported method is invoked, nothing happens.
+ * <p>If an unsupported method is invoked, nothing happens.
*
- * Currently three methods are proxied:
+ * <p>Currently three methods are proxied:
* - setApplicationTags (>= 2.4.0)
* - setAttemptFailuresValidityInterval (>= 2.6.0)
* - setKeepContainersAcrossApplicationAttempts (>= 2.4.0)
@@ -1302,11 +1307,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
if (hasLogback || hasLog4j) {
logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
- if(hasLogback) {
+ if (hasLogback) {
logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
}
- if(hasLog4j) {
+ if (hasLog4j) {
logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
}
}
@@ -1345,7 +1350,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
/**
- * Creates a YarnClusterClient; may be overriden in tests
+ * Creates a YarnClusterClient; may be overriden in tests.
*/
protected YarnClusterClient createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index 4b24f42..8bf6a2e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -27,10 +27,10 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,21 +43,20 @@ import java.util.concurrent.Callable;
* It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster}
* and {@link YarnResourceManager}.
*
- * The JobMasters handles Flink job execution, while the YarnResourceManager handles container
+ * <p>The JobMasters handles Flink job execution, while the YarnResourceManager handles container
* allocation and failure detection.
*/
public abstract class AbstractYarnFlinkApplicationMasterRunner {
- /** Logger */
protected static final Logger LOG = LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class);
- /** The process environment variables */
+ /** The process environment variables. */
protected static final Map<String, String> ENV = System.getenv();
- /** The exit code returned if the initialization of the application master failed */
+ /** The exit code returned if the initialization of the application master failed. */
protected static final int INIT_ERROR_EXIT_CODE = 31;
- /** The host name passed by env */
+ /** The host name passed by env. */
protected String appMasterHostname;
/**
@@ -87,7 +86,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
LOG.info("Remote keytab principal obtained {}", remoteKeytabPrincipal);
String keytabPath = null;
- if(remoteKeytabPath != null) {
+ if (remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
LOG.debug("Keytab path: {}", keytabPath);
@@ -96,7 +95,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
- currentUser.getShortUserName(), yarnClientUsername );
+ currentUser.getShortUserName(), yarnClientUsername);
// Flink configuration
final Map<String, String> dynamicProperties =
@@ -122,7 +121,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
}
SecurityUtils.SecurityConfiguration sc;
- if(hadoopConfiguration != null) {
+ if (hadoopConfiguration != null) {
sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
} else {
sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
@@ -170,7 +169,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
/**
* @param baseDirectory The working directory
* @param additional Additional parameters
- *
+ *
* @return The configuration to be used by the TaskExecutors.
*/
private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
@@ -194,7 +193,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
}
- // if the user has set the deprecated YARN-specific config keys, we add the
+ // if the user has set the deprecated YARN-specific config keys, we add the
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
index cb2f40a..5f059bf 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
@@ -19,8 +19,8 @@
package org.apache.flink.yarn;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
import org.apache.hadoop.yarn.api.records.Container;
import static java.util.Objects.requireNonNull;
@@ -30,10 +30,10 @@ import static java.util.Objects.requireNonNull;
*/
public class RegisteredYarnWorkerNode implements ResourceIDRetrievable {
- /** The container on which the worker runs */
+ /** The container on which the worker runs. */
private final Container yarnContainer;
- /** The resource id associated with this worker type */
+ /** The resource id associated with this worker type. */
private final ResourceID resourceID;
public RegisteredYarnWorkerNode(Container yarnContainer) {
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 60f7204..698b69e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,25 +18,9 @@
package org.apache.flink.yarn;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +34,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -57,6 +42,20 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
@@ -64,20 +63,20 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
* Utility class that provides helper methods to work with Apache Hadoop YARN.
*/
public final class Utils {
-
+
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
- /** Keytab file name populated in YARN container */
+ /** Keytab file name populated in YARN container. */
public static final String KEYTAB_FILE_NAME = "krb5.keytab";
- /** KRB5 file name populated in YARN container for secure IT run */
+ /** KRB5 file name populated in YARN container for secure IT run. */
public static final String KRB5_FILE_NAME = "krb5.conf";
- /** Yarn site xml file name populated in YARN container for secure IT run */
+ /** Yarn site xml file name populated in YARN container for secure IT run. */
public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
/**
- * See documentation
+ * See documentation.
*/
public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
@@ -102,14 +101,13 @@ public final class Utils {
+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
}
- int heapLimit = (int)((float)memory * memoryCutoffRatio);
+ int heapLimit = (int) ((float) memory * memoryCutoffRatio);
if (heapLimit < minCutoff) {
heapLimit = minCutoff;
}
return memory - heapLimit;
}
-
public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) {
addToEnvironment(
appMasterEnv,
@@ -123,9 +121,7 @@ public final class Utils {
}
}
-
/**
- *
* @return Path to remote file (usually hdfs)
* @throws IOException
*/
@@ -165,7 +161,7 @@ public final class Utils {
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
- for(Token<? extends TokenIdentifier> token : usrTok) {
+ for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
@@ -173,7 +169,7 @@ public final class Utils {
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
@@ -193,7 +189,7 @@ public final class Utils {
// Intended call: HBaseConfiguration.addHbaseResources(conf);
Class
.forName("org.apache.hadoop.hbase.HBaseConfiguration")
- .getMethod("addHbaseResources", Configuration.class )
+ .getMethod("addHbaseResources", Configuration.class)
.invoke(null, conf);
// ----
@@ -220,7 +216,7 @@ public final class Utils {
credentials.addToken(token.getService(), token);
LOG.info("Added HBase Kerberos security token to credentials.");
- } catch ( ClassNotFoundException
+ } catch (ClassNotFoundException
| NoSuchMethodException
| IllegalAccessException
| InvocationTargetException e) {
@@ -231,7 +227,7 @@ public final class Utils {
}
/**
- * Copied method from org.apache.hadoop.yarn.util.Apps
+ * Copied method from org.apache.hadoop.yarn.util.Apps.
* It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
* by https://issues.apache.org/jira/browse/YARN-1931
*/
@@ -262,8 +258,8 @@ public final class Utils {
*/
public static Map<String, String> getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) {
Map<String, String> result = new HashMap<>();
- for(Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
- if(entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
+ for (Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
+ if (entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
// remove prefix
String key = entry.getKey().substring(envPrefix.length());
result.put(key, entry.getValue());
@@ -347,7 +343,7 @@ public final class Utils {
//register keytab
LocalResource keytabResource = null;
- if(remoteKeytabPath != null) {
+ if (remoteKeytabPath != null) {
log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
keytabResource = Records.newRecord(LocalResource.class);
Path keytabPath = new Path(remoteKeytabPath);
@@ -359,7 +355,7 @@ public final class Utils {
LocalResource yarnConfResource = null;
LocalResource krb5ConfResource = null;
boolean hasKrb5 = false;
- if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+ if (remoteYarnConfPath != null && remoteKrb5Path != null) {
log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
yarnConfResource = Records.newRecord(LocalResource.class);
Path yarnConfPath = new Path(remoteYarnConfPath);
@@ -405,12 +401,12 @@ public final class Utils {
taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
//To support Yarn Secure Integration Test Scenario
- if(yarnConfResource != null && krb5ConfResource != null) {
+ if (yarnConfResource != null && krb5ConfResource != null) {
taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
}
- if(keytabResource != null) {
+ if (keytabResource != null) {
taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource);
}
@@ -450,7 +446,7 @@ public final class Utils {
containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
- if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+ if (remoteKeytabPath != null && remoteKeytabPrincipal != null) {
containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 64417f6..a424740 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -18,10 +18,6 @@
package org.apache.flink.yarn;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -48,19 +44,17 @@ import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Some;
-import scala.concurrent.duration.FiniteDuration;
-
import java.io.File;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -69,41 +63,43 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.yarn.Utils.require;
/**
* This class is the executable entry point for the YARN application master.
* It starts actor system and the actors for {@link JobManager}
* and {@link YarnFlinkResourceManager}.
- *
- * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
+ *
+ * <p>The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
* allocation and failure detection.
*/
public class YarnApplicationMasterRunner {
- /** Logger */
protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
/** The maximum time that TaskManagers may be waiting to register at the JobManager,
- * before they quit */
+ * before they quit. */
private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
- /** The process environment variables */
+ /** The process environment variables. */
private static final Map<String, String> ENV = System.getenv();
- /** The exit code returned if the initialization of the application master failed */
+ /** The exit code returned if the initialization of the application master failed. */
private static final int INIT_ERROR_EXIT_CODE = 31;
- /** The exit code returned if the process exits because a critical actor died */
+ /** The exit code returned if the process exits because a critical actor died. */
private static final int ACTOR_DIED_EXIT_CODE = 32;
-
// ------------------------------------------------------------------------
// Program entry point
// ------------------------------------------------------------------------
/**
- * The entry point for the YARN application master.
+ * The entry point for the YARN application master.
*
* @param args The command line arguments.
*/
@@ -144,7 +140,7 @@ public class YarnApplicationMasterRunner {
LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
String keytabPath = null;
- if(remoteKeytabPath != null) {
+ if (remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
keytabPath = f.getAbsolutePath();
LOG.debug("keytabPath: {}", keytabPath);
@@ -153,7 +149,7 @@ public class YarnApplicationMasterRunner {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
- currentUser.getShortUserName(), yarnClientUsername );
+ currentUser.getShortUserName(), yarnClientUsername);
// Flink configuration
final Map<String, String> dynamicProperties =
@@ -172,7 +168,7 @@ public class YarnApplicationMasterRunner {
//To support Yarn Secure Integration Test Scenario
File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
- if(krb5Conf.exists() && krb5Conf.canRead()) {
+ if (krb5Conf.exists() && krb5Conf.canRead()) {
String krb5Path = krb5Conf.getAbsolutePath();
LOG.info("KRB5 Conf: {}", krb5Path);
hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
@@ -181,7 +177,7 @@ public class YarnApplicationMasterRunner {
}
SecurityUtils.SecurityConfiguration sc;
- if(hadoopConfiguration != null) {
+ if (hadoopConfiguration != null) {
sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
} else {
sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
@@ -298,7 +294,6 @@ public class YarnApplicationMasterRunner {
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
-
// ----------------- (2) start the actor system -------------------
// try to start the actor system, JobManager and JobManager actor system
@@ -314,7 +309,6 @@ public class YarnApplicationMasterRunner {
LOG.info("Actor system bound to hostname {}.", akkaHostname);
-
// ---- (3) Generate the configuration for the TaskManagers
final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
@@ -326,7 +320,6 @@ public class YarnApplicationMasterRunner {
taskManagerParameters, taskManagerConfig,
currDir, getTaskManagerClass(), LOG);
-
// ---- (4) start the actors and components in this order:
// 1) JobManager & Archive (in non-HA case, the leader service takes this)
@@ -360,7 +353,6 @@ public class YarnApplicationMasterRunner {
getJobManagerClass(),
getArchivistClass())._1();
-
// 2: the web monitor
LOG.debug("Starting Web Frontend");
@@ -390,7 +382,7 @@ public class YarnApplicationMasterRunner {
webMonitorURL,
taskManagerParameters,
taskManagerContext,
- numInitialTaskManagers,
+ numInitialTaskManagers,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
@@ -467,7 +459,6 @@ public class YarnApplicationMasterRunner {
return 0;
}
-
// ------------------------------------------------------------------------
// For testing, this allows to override the actor classes used for
// JobManager and the archive of completed jobs
@@ -494,10 +485,11 @@ public class YarnApplicationMasterRunner {
// ------------------------------------------------------------------------
/**
- *
- * @param baseDirectory
- * @param additional
- *
+ * Reads the global configuration from the given directory and adds the given parameters to it.
+ *
+ * @param baseDirectory directory to load the configuration from
+ * @param additional additional parameters to be included in the configuration
+ *
* @return The configuration to be used by the TaskManagers.
*/
@SuppressWarnings("deprecation")
@@ -522,7 +514,7 @@ public class YarnApplicationMasterRunner {
configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
}
- // if the user has set the deprecated YARN-specific config keys, we add the
+ // if the user has set the deprecated YARN-specific config keys, we add the
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 7042f99..a435ef7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -15,14 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.yarn;
-import akka.actor.ActorRef;
+package org.apache.flink.yarn;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ClusterClient;
@@ -39,6 +34,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -47,10 +48,6 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
@@ -59,6 +56,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
/**
* Java representation of a running Flink cluster within YARN.
*/
@@ -84,7 +86,7 @@ public class YarnClusterClient extends ClusterClient {
private boolean isConnected = true;
- /** Indicator whether this cluster has just been created */
+ /** Indicator whether this cluster has just been created. */
private final boolean newlyCreatedCluster;
/**
@@ -128,7 +130,7 @@ public class YarnClusterClient extends ClusterClient {
}
/**
- * Disconnect from the Yarn cluster
+ * Disconnect from the Yarn cluster.
*/
public void disconnect() {
@@ -136,7 +138,7 @@ public class YarnClusterClient extends ClusterClient {
return;
}
- if(!isConnected) {
+ if (!isConnected) {
throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
}
@@ -151,7 +153,7 @@ public class YarnClusterClient extends ClusterClient {
try {
pollingRunner.stopRunner();
pollingRunner.join(1000);
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
LOG.warn("Shutdown of the polling runner was interrupted", e);
Thread.currentThread().interrupt();
}
@@ -159,7 +161,6 @@ public class YarnClusterClient extends ClusterClient {
isConnected = false;
}
-
// -------------------------- Interaction with the cluster ------------------------
/*
@@ -209,7 +210,7 @@ public class YarnClusterClient extends ClusterClient {
@Override
public String getWebInterfaceURL() {
// there seems to be a difference between HD 2.2.0 and 2.6.0
- if(!trackingURL.startsWith("http://")) {
+ if (!trackingURL.startsWith("http://")) {
return "http://" + trackingURL;
} else {
return trackingURL;
@@ -226,10 +227,10 @@ public class YarnClusterClient extends ClusterClient {
*/
@Override
public GetClusterStatusResponse getClusterStatus() {
- if(!isConnected) {
+ if (!isConnected) {
throw new IllegalStateException("The cluster is not connected to the cluster.");
}
- if(hasBeenShutdown()) {
+ if (hasBeenShutdown()) {
throw new IllegalStateException("The cluster has already been shutdown.");
}
@@ -245,17 +246,17 @@ public class YarnClusterClient extends ClusterClient {
}
public ApplicationStatus getApplicationStatus() {
- if(!isConnected) {
+ if (!isConnected) {
throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
}
ApplicationReport lastReport = null;
- if(pollingRunner == null) {
+ if (pollingRunner == null) {
LOG.warn("YarnClusterClient.getApplicationStatus() has been called on an uninitialized cluster." +
"The system might be in an erroneous state");
} else {
lastReport = pollingRunner.getLastReport();
}
- if(lastReport == null) {
+ if (lastReport == null) {
LOG.warn("YarnClusterClient.getApplicationStatus() has been called on a cluster that didn't receive a status so far." +
"The system might be in an erroneous state");
return ApplicationStatus.UNKNOWN;
@@ -264,7 +265,7 @@ public class YarnClusterClient extends ClusterClient {
ApplicationStatus status =
(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED) ?
ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
- if(status != ApplicationStatus.SUCCEEDED) {
+ if (status != ApplicationStatus.SUCCEEDED) {
LOG.warn("YARN reported application state {}", appState);
LOG.warn("Diagnostics: {}", lastReport.getDiagnostics());
}
@@ -275,17 +276,17 @@ public class YarnClusterClient extends ClusterClient {
@Override
public List<String> getNewMessages() {
- if(hasBeenShutdown()) {
+ if (hasBeenShutdown()) {
throw new RuntimeException("The YarnClusterClient has already been stopped");
}
- if(!isConnected) {
+ if (!isConnected) {
throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
}
List<String> ret = new ArrayList<String>();
// get messages from ApplicationClient (locally)
- while(true) {
+ while (true) {
Object result;
try {
Future<Object> response =
@@ -294,23 +295,23 @@ public class YarnClusterClient extends ClusterClient {
YarnMessages.getLocalGetYarnMessage(),
new Timeout(akkaDuration));
result = Await.result(response, akkaDuration);
- } catch(Exception ioe) {
+ } catch (Exception ioe) {
LOG.warn("Error retrieving the YARN messages locally", ioe);
break;
}
- if(!(result instanceof Option)) {
+ if (!(result instanceof Option)) {
throw new RuntimeException("LocalGetYarnMessage requires a response of type " +
"Option. Instead the response is of type " + result.getClass() + ".");
} else {
Option messageOption = (Option) result;
LOG.debug("Received message option {}", messageOption);
- if(messageOption.isEmpty()) {
+ if (messageOption.isEmpty()) {
break;
} else {
Object obj = messageOption.get();
- if(obj instanceof InfoMessage) {
+ if (obj instanceof InfoMessage) {
InfoMessage msg = (InfoMessage) obj;
ret.add("[" + msg.date() + "] " + msg.message());
} else {
@@ -339,7 +340,7 @@ public class YarnClusterClient extends ClusterClient {
}
/**
- * Shuts down the Yarn application
+ * Shuts down the Yarn application.
*/
public void shutdownCluster() {
@@ -365,7 +366,7 @@ public class YarnClusterClient extends ClusterClient {
"Flink YARN Client requested shutdown"),
new Timeout(akkaDuration));
Await.ready(response, akkaDuration);
- } catch(Exception e) {
+ } catch (Exception e) {
LOG.warn("Error while stopping YARN cluster.", e);
}
@@ -385,7 +386,7 @@ public class YarnClusterClient extends ClusterClient {
try {
pollingRunner.stopRunner();
pollingRunner.join(1000);
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
LOG.warn("Shutdown of the polling runner was interrupted", e);
Thread.currentThread().interrupt();
}
@@ -420,7 +421,6 @@ public class YarnClusterClient extends ClusterClient {
return hasBeenShutDown.get();
}
-
private class ClientShutdownHook extends Thread {
@Override
public void run() {
@@ -446,14 +446,13 @@ public class YarnClusterClient extends ClusterClient {
private final Object lock = new Object();
private ApplicationReport lastReport;
-
public PollingThread(YarnClient yarnClient, ApplicationId appId) {
this.yarnClient = yarnClient;
this.appId = appId;
}
public void stopRunner() {
- if(!running.get()) {
+ if (!running.get()) {
LOG.warn("Polling thread was already stopped");
}
running.set(false);
@@ -484,7 +483,7 @@ public class YarnClusterClient extends ClusterClient {
stopRunner();
}
}
- if(running.get() && !yarnClient.isInState(Service.STATE.STARTED)) {
+ if (running.get() && !yarnClient.isInState(Service.STATE.STARTED)) {
// == if the polling thread is still running but the yarn client is stopped.
LOG.warn("YARN client is unexpected in state " + yarnClient.getServiceState());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
index 33d5987..f58e6aa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.yarn;
import org.apache.flink.api.common.JobSubmissionResult;
@@ -23,6 +24,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;
+
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -37,7 +39,7 @@ import java.util.List;
/**
* Java representation of a running Flink job on YARN.
- * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster,
+ * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster,
* so this class will be used as a client to communicate with yarn and start the job on yarn.
*/
public class YarnClusterClientV2 extends ClusterClient {
@@ -95,7 +97,7 @@ public class YarnClusterClientV2 extends ClusterClient {
if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
appId = report.getApplicationId();
trackingURL = report.getTrackingUrl();
- logAndSysout("Please refer to " + getWebInterfaceURL()
+ logAndSysout("Please refer to " + getWebInterfaceURL()
+ " for the running status of job " + jobGraph.getJobID().toString());
//TODO: not support attach mode now
return new JobSubmissionResult(jobGraph.getJobID());
@@ -112,7 +114,7 @@ public class YarnClusterClientV2 extends ClusterClient {
@Override
public String getWebInterfaceURL() {
// there seems to be a difference between HD 2.2.0 and 2.6.0
- if(!trackingURL.startsWith("http://")) {
+ if (!trackingURL.startsWith("http://")) {
return "http://" + trackingURL;
} else {
return trackingURL;
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 5f745b2..db5206a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.yarn;
+package org.apache.flink.yarn;
/**
* Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index e3bd944..b22b163 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.yarn;
+package org.apache.flink.yarn;
/**
* Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6.
* This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related
- * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor<YarnClusterClientV2>.
+ * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor<YarnClusterClientV2>.
* However, in order to use the code in AbstractYarnClusterDescriptor for setting environments and so on, we make YarnClusterDescriptorV2 as now.
*/
public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index 7c9c7a7..03d94fe 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -27,9 +27,9 @@ public class YarnConfigKeys {
// Environment variable names
// ------------------------------------------------------------------------
- public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
- public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
- public final static String ENV_APP_ID = "_APP_ID";
+ public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+ public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+ public static final String ENV_APP_ID = "_APP_ID";
public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
public static final String ENV_SLOTS = "_SLOTS";
@@ -38,12 +38,12 @@ public class YarnConfigKeys {
public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
- public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
- public final static String FLINK_YARN_FILES = "_FLINK_YARN_FILES"; // the root directory for all yarn application files
+ public static final String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+ public static final String FLINK_YARN_FILES = "_FLINK_YARN_FILES"; // the root directory for all yarn application files
- public final static String KEYTAB_PATH = "_KEYTAB_PATH";
- public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
- public final static String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
+ public static final String KEYTAB_PATH = "_KEYTAB_PATH";
+ public static final String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+ public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE";
public static final String ENV_KRB5_PATH = "_KRB5_PATH";
@@ -51,7 +51,7 @@ public class YarnConfigKeys {
// ------------------------------------------------------------------------
- /** Private constructor to prevent instantiation */
+ /** Private constructor to prevent instantiation. */
private YarnConfigKeys() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
index 370df26..9a98519 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
import org.apache.hadoop.yarn.api.records.Container;
import static java.util.Objects.requireNonNull;
@@ -34,7 +35,7 @@ public class YarnContainerInLaunch implements ResourceIDRetrievable {
private final long timestamp;
- /** The resource id associated with this worker type */
+ /** The resource id associated with this worker type. */
private final ResourceID resourceID;
public YarnContainerInLaunch(Container container) {
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 3f4d4f6..2ad9065 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -18,7 +18,6 @@
package org.apache.flink.yarn;
-import akka.actor.ActorSystem;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
@@ -46,9 +45,10 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
+
+import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
import javax.annotation.concurrent.GuardedBy;
@@ -57,32 +57,33 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
+import scala.concurrent.duration.FiniteDuration;
+
/**
* This class is the executable entry point for the YARN Application Master that
* executes a single Flink job and then shuts the YARN application down.
- *
+ *
* <p>The lifetime of the YARN application bound to that of the Flink job. Other
* YARN Application Master implementations are for example the YARN session.
- *
- * It starts actor system and the actors for {@link JobManagerRunner}
+ *
+ * <p>It starts actor system and the actors for {@link JobManagerRunner}
* and {@link YarnResourceManager}.
*
- * The JobManagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * <p>The JobManagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
* JobMaster handles Flink job execution, while the YarnResourceManager handles container
* allocation and failure detection.
*/
public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner
implements OnCompletionActions, FatalErrorHandler {
- /** Logger */
protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
- /** The job graph file path */
+ /** The job graph file path. */
private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
// ------------------------------------------------------------------------
- /** The lock to guard startup / shutdown / manipulation methods */
+ /** The lock to guard startup / shutdown / manipulation methods. */
private final Object lock = new Object();
@GuardedBy("lock")
@@ -144,7 +145,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
heartbeatServices = HeartbeatServices.fromConfiguration(config);
-
+
metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
// ---- (2) init resource manager -------
@@ -310,7 +311,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
//----------------------------------------------------------------------------------------------
/**
- * Job completion notification triggered by JobManager
+ * Job completion notification triggered by JobManager.
*/
@Override
public void jobFinished(JobExecutionResult result) {
@@ -318,7 +319,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
}
/**
- * Job completion notification triggered by JobManager
+ * Job completion notification triggered by JobManager.
*/
@Override
public void jobFailed(Throwable cause) {
@@ -326,7 +327,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
}
/**
- * Job completion notification triggered by self
+ * Job completion notification triggered by self.
*/
@Override
public void jobFinishedByOther() {
http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 3c85795..4626a7e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -18,15 +18,12 @@
package org.apache.flink.yarn;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -34,6 +31,8 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.messages.ContainersAllocated;
import org.apache.flink.yarn.messages.ContainersComplete;
+import akka.actor.ActorRef;
+import akka.actor.Props;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
import org.slf4j.Logger;
import java.lang.reflect.Method;
@@ -66,57 +64,57 @@ import static java.util.Objects.requireNonNull;
*/
public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYarnWorkerNode> {
- /** The heartbeat interval while the resource master is waiting for containers */
+ /** The heartbeat interval while the resource master is waiting for containers. */
private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
- /** The default heartbeat interval during regular operation */
+ /** The default heartbeat interval during regular operation. */
private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
/** Environment variable name of the final container id used by the Flink ResourceManager.
* Container ID generation may vary across Hadoop versions. */
- final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+ static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
- /** The containers where a TaskManager is starting and we are waiting for it to register */
+ /** The containers where a TaskManager is starting and we are waiting for it to register. */
private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
/** Containers we have released, where we are waiting for an acknowledgement that
- * they are released */
+ * they are released. */
private final Map<ContainerId, Container> containersBeingReturned;
- /** The YARN / Hadoop configuration object */
+ /** The YARN / Hadoop configuration object. */
private final YarnConfiguration yarnConfig;
- /** The TaskManager container parameters (like container memory size) */
+ /** The TaskManager container parameters (like container memory size). */
private final ContaineredTaskManagerParameters taskManagerParameters;
- /** Context information used to start a TaskManager Java process */
+ /** Context information used to start a TaskManager Java process. */
private final ContainerLaunchContext taskManagerLaunchContext;
- /** Host name for the container running this process */
+ /** Host name for the container running this process. */
private final String applicationMasterHostName;
- /** Web interface URL, may be null */
+ /** Web interface URL, may be null. */
private final String webInterfaceURL;
- /** Default heartbeat interval between this actor and the YARN ResourceManager */
+ /** Default heartbeat interval between this actor and the YARN ResourceManager. */
private final int yarnHeartbeatIntervalMillis;
- /** Number of failed TaskManager containers before stopping the application. -1 means infinite. */
+ /** Number of failed TaskManager containers before stopping the application. -1 means infinite. */
private final int maxFailedContainers;
- /** Callback handler for the asynchronous resourceManagerClient */
+ /** Callback handler for the asynchronous resourceManagerClient. */
private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
- /** Client to communicate with the Resource Manager (YARN's master) */
+ /** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
- /** Client to communicate with the Node manager and launch TaskManager processes */
+ /** Client to communicate with the Node manager and launch TaskManager processes. */
private NMClient nodeManagerClient;
- /** The number of containers requested, but not yet granted */
+ /** The number of containers requested, but not yet granted. */
private int numPendingContainerRequests;
- /** The number of failed containers since the master became active */
+ /** The number of failed containers since the master became active. */
private int failedContainersSoFar;
/** A reference to the reflector to look up previous session containers. */
@@ -428,7 +426,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
}
// ------------------------------------------------------------------------
- // Callbacks from the YARN Resource Manager
+ // Callbacks from the YARN Resource Manager
// ------------------------------------------------------------------------
private void containersAllocated(List<Container> containers) {
@@ -491,7 +489,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
/**
* Invoked when the ResourceManager informs of completed containers.
* Called via an actor message by the callback from the ResourceManager client.
- *
+ *
* @param containers The containers that have completed.
*/
private void containersComplete(List<ContainerStatus> containers) {
@@ -624,8 +622,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
private Logger logger;
private Method method;
- public RegisterApplicationMasterResponseReflector(Logger LOG) {
- this.logger = LOG;
+ public RegisterApplicationMasterResponseReflector(Logger log) {
+ this.logger = log;
try {
method = RegisterApplicationMasterResponse.class
@@ -671,12 +669,12 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
/**
* Creates the props needed to instantiate this actor.
- *
- * Rather than extracting and validating parameters in the constructor, this factory method takes
+ *
+ * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
* care of that. That way, errors occur synchronously, and are not swallowed simply in a
* failed asynchronous attempt to start the actor.
-
- * @param actorClass
+ *
+ * @param actorClass
* The actor class, to allow overriding this actor with subclasses for testing.
* @param flinkConfig
* The Flink configuration object.
@@ -694,7 +692,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
* The initial number of TaskManagers to allocate.
* @param log
* The logger to log to.
- *
+ *
* @return The Props object to instantiate the YarnFlinkResourceManager actor.
*/
public static Props createActorProps(Class<? extends YarnFlinkResourceManager> actorClass,
@@ -706,8 +704,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
ContaineredTaskManagerParameters taskManagerParameters,
ContainerLaunchContext taskManagerLaunchContext,
int numInitialTaskManagers,
- Logger log)
- {
+ Logger log) {
+
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;