You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/12/02 18:41:53 UTC
svn commit: r1547122 [1/2] - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/proto/server/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/...
Author: arp
Date: Mon Dec 2 17:41:44 2013
New Revision: 1547122
URL: http://svn.apache.org/r1547122
Log:
Merging r1544666 through r1547120 from trunk to branch HDFS-2832
Added:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
- copied unchanged from r1547120, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java
- copied unchanged from r1547120, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java
- copied unchanged from r1547120, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
- copied unchanged from r1547120, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
- copied unchanged from r1547120, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApp.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/TestWebApp.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServlet.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Mon Dec 2 17:41:44 2013
@@ -117,6 +117,18 @@ Release 2.3.0 - UNRELEASED
YARN-1303. Fixed DistributedShell to not fail with multiple commands separated
by a semi-colon as shell-command. (Xuan Gong via vinodkv)
+ YARN-1423. Support queue placement by secondary group in the Fair Scheduler
+ (Ted Malaska via Sandy Ryza)
+
+ YARN-1314. Fixed DistributedShell to not fail with multiple arguments for a
+ shell command separated by spaces. (Xuan Gong via vinodkv)
+
+ YARN-1239. Modified ResourceManager state-store implementations to start
+ storing version numbers. (Jian He via vinodkv)
+
+ YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf
+ queues. (Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
@@ -173,6 +185,9 @@ Release 2.3.0 - UNRELEASED
YARN-1320. Fixed Distributed Shell application to respect custom log4j
properties file. (Xuan Gong via vinodkv)
+ YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some
+ tests. (Jian He via vinodkv)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto Mon Dec 2 17:41:44 2013
@@ -103,9 +103,9 @@ enum RMAppStateProto {
message ApplicationStateDataProto {
optional int64 submit_time = 1;
- optional int64 start_time = 2;
- optional ApplicationSubmissionContextProto application_submission_context = 3;
- optional string user = 4;
+ optional ApplicationSubmissionContextProto application_submission_context = 2;
+ optional string user = 3;
+ optional int64 start_time = 4;
optional RMAppStateProto application_state = 5;
optional string diagnostics = 6 [default = "N/A"];
optional int64 finish_time = 7;
@@ -121,3 +121,8 @@ message ApplicationAttemptStateDataProto
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
}
+
+message RMStateVersionProto {
+ optional int32 major_version = 1;
+ optional int32 minor_version = 2;
+}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Mon Dec 2 17:41:44 2013
@@ -224,6 +224,7 @@ public class ApplicationMaster {
private final String log4jPath = "log4j.properties";
private final String shellCommandPath = "shellCommands";
+ private final String shellArgsPath = "shellArgs";
private volatile boolean done;
private volatile boolean success;
@@ -309,7 +310,6 @@ public class ApplicationMaster {
"App Attempt ID. Not to be used unless for testing purposes");
opts.addOption("shell_script", true,
"Location of the shell script to be executed");
- opts.addOption("shell_args", true, "Command line args for the shell script");
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("container_memory", true,
@@ -331,10 +331,10 @@ public class ApplicationMaster {
}
//Check whether customer log4j.properties file exists
- File customerLog4jFile = new File(log4jPath);
- if (customerLog4jFile.exists()) {
+ if (fileExist(log4jPath)) {
try {
- Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, log4jPath);
+ Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
+ log4jPath);
} catch (Exception e) {
LOG.warn("Can not set up custom log4j properties. " + e);
}
@@ -387,24 +387,16 @@ public class ApplicationMaster {
+ appAttemptID.getApplicationId().getClusterTimestamp()
+ ", attemptId=" + appAttemptID.getAttemptId());
- File shellCommandFile = new File(shellCommandPath);
- if (!shellCommandFile.exists()) {
+ if (!fileExist(shellCommandPath)) {
throw new IllegalArgumentException(
"No shell command specified to be executed by application master");
}
- FileInputStream fs = null;
- DataInputStream ds = null;
- try {
- ds = new DataInputStream(new FileInputStream(shellCommandFile));
- shellCommand = ds.readUTF();
- } finally {
- org.apache.commons.io.IOUtils.closeQuietly(ds);
- org.apache.commons.io.IOUtils.closeQuietly(fs);
- }
+ shellCommand = readContent(shellCommandPath);
- if (cliParser.hasOption("shell_args")) {
- shellArgs = cliParser.getOptionValue("shell_args");
+ if (fileExist(shellArgsPath)) {
+ shellArgs = readContent(shellArgsPath);
}
+
if (cliParser.hasOption("shell_env")) {
String shellEnvs[] = cliParser.getOptionValues("shell_env");
for (String env : shellEnvs) {
@@ -922,4 +914,18 @@ public class ApplicationMaster {
LOG.info("Requested container ask: " + request.toString());
return request;
}
+
+ private boolean fileExist(String filePath) {
+ return new File(filePath).exists();
+ }
+
+ private String readContent(String filePath) throws IOException {
+ DataInputStream ds = null;
+ try {
+ ds = new DataInputStream(new FileInputStream(filePath));
+ return ds.readUTF();
+ } finally {
+ org.apache.commons.io.IOUtils.closeQuietly(ds);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Mon Dec 2 17:41:44 2013
@@ -30,9 +30,11 @@ import java.util.Vector;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -137,7 +139,7 @@ public class Client {
// Location of shell script
private String shellScriptPath = "";
// Args to be passed to the shell command
- private String shellArgs = "";
+ private String[] shellArgs = new String[] {};
// Env variables to be setup for the shell command
private Map<String, String> shellEnv = new HashMap<String, String>();
// Shell Command Container priority
@@ -166,6 +168,8 @@ public class Client {
private Options opts;
private final String shellCommandPath = "shellCommands";
+ private final String shellArgsPath = "shellArgs";
+ private final String appMasterJarPath = "AppMaster.jar";
// Hardcoded path to custom log_properties
private final String log4jPath = "log4j.properties";
@@ -223,7 +227,9 @@ public class Client {
opts.addOption("jar", true, "Jar file containing the application master");
opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
opts.addOption("shell_script", true, "Location of the shell script to be executed");
- opts.addOption("shell_args", true, "Command line args for the shell script");
+ opts.addOption("shell_args", true, "Command line args for the shell script." +
+ "Multiple args can be separated by empty space.");
+ opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
@@ -311,7 +317,7 @@ public class Client {
shellScriptPath = cliParser.getOptionValue("shell_script");
}
if (cliParser.hasOption("shell_args")) {
- shellArgs = cliParser.getOptionValue("shell_args");
+ shellArgs = cliParser.getOptionValues("shell_args");
}
if (cliParser.hasOption("shell_env")) {
String envs[] = cliParser.getOptionValues("shell_env");
@@ -440,43 +446,13 @@ public class Client {
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
- Path src = new Path(appMasterJar);
- String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
- Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
- fs.copyFromLocalFile(false, true, src, dst);
- FileStatus destStatus = fs.getFileStatus(dst);
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
-
- // Set the type of resource - file or archive
- // archives are untarred at destination
- // we don't need the jar file to be untarred for now
- amJarRsrc.setType(LocalResourceType.FILE);
- // Set visibility of the resource
- // Setting to most private option
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- // Set the resource to be copied over
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
- // Set timestamp and length of file so that the framework
- // can do basic sanity checks for the local resource
- // after it has been copied over to ensure it is the same
- // resource the client intended to use with the application
- amJarRsrc.setTimestamp(destStatus.getModificationTime());
- amJarRsrc.setSize(destStatus.getLen());
- localResources.put("AppMaster.jar", amJarRsrc);
+ addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.getId(),
+ localResources, null);
// Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
- Path log4jSrc = new Path(log4jPropFile);
- String log4jPathSuffix = appName + "/" + appId.getId() + "/" + log4jPath;
- Path log4jDst = new Path(fs.getHomeDirectory(), log4jPathSuffix);
- fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
- FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
- LocalResource log4jRsrc =
- LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()),
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
- log4jFileStatus.getLen(), log4jFileStatus.getModificationTime());
- localResources.put(log4jPath, log4jRsrc);
+ addToLocalResources(fs, log4jPropFile, log4jPath, appId.getId(),
+ localResources, null);
}
// The shell script has to be made available on the final container(s)
@@ -500,25 +476,13 @@ public class Client {
}
if (!shellCommand.isEmpty()) {
- String shellCommandSuffix =
- appName + "/" + appId.getId() + "/" + shellCommandPath;
- Path shellCommandDst =
- new Path(fs.getHomeDirectory(), shellCommandSuffix);
- FSDataOutputStream ostream = null;
- try {
- ostream = FileSystem
- .create(fs, shellCommandDst, new FsPermission((short) 0710));
- ostream.writeUTF(shellCommand);
- } finally {
- IOUtils.closeQuietly(ostream);
- }
- FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
- LocalResource scRsrc =
- LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromURI(shellCommandDst.toUri()),
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
- scFileStatus.getLen(), scFileStatus.getModificationTime());
- localResources.put(shellCommandPath, scRsrc);
+ addToLocalResources(fs, null, shellCommandPath, appId.getId(),
+ localResources, shellCommand);
+ }
+
+ if (shellArgs.length > 0) {
+ addToLocalResources(fs, null, shellArgsPath, appId.getId(),
+ localResources, StringUtils.join(shellArgs, " "));
}
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
@@ -579,9 +543,6 @@ public class Client {
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
- if (!shellArgs.isEmpty()) {
- vargs.add("--shell_args " + shellArgs + "");
- }
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
@@ -750,4 +711,31 @@ public class Client {
yarnClient.killApplication(appId);
}
+ private void addToLocalResources(FileSystem fs, String fileSrcPath,
+ String fileDstPath, int appId, Map<String, LocalResource> localResources,
+ String resources) throws IOException {
+ String suffix =
+ appName + "/" + appId + "/" + fileDstPath;
+ Path dst =
+ new Path(fs.getHomeDirectory(), suffix);
+ if (fileSrcPath == null) {
+ FSDataOutputStream ostream = null;
+ try {
+ ostream = FileSystem
+ .create(fs, dst, new FsPermission((short) 0710));
+ ostream.writeUTF(resources);
+ } finally {
+ IOUtils.closeQuietly(ostream);
+ }
+ } else {
+ fs.copyFromLocalFile(new Path(fileSrcPath), dst);
+ }
+ FileStatus scFileStatus = fs.getFileStatus(dst);
+ LocalResource scRsrc =
+ LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromURI(dst.toUri()),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ scFileStatus.getLen(), scFileStatus.getModificationTime());
+ localResources.put(fileDstPath, scRsrc);
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Mon Dec 2 17:41:44 2013
@@ -270,6 +270,40 @@ public class TestDistributedShell {
}
@Test(timeout=90000)
+ public void testDSShellWithMultipleArgs() throws Exception {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "4",
+ "--shell_command",
+ "echo",
+ "--shell_args",
+ "HADOOP YARN MAPREDUCE HDFS",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1"
+ };
+
+ LOG.info("Initializing DS Client");
+ final Client client =
+ new Client(new Configuration(yarnCluster.getConfig()));
+ boolean initSuccess = client.init(args);
+ Assert.assertTrue(initSuccess);
+ LOG.info("Running DS Client");
+ boolean result = client.run();
+ LOG.info("Client run completed. Result=" + result);
+ List<String> expectedContent = new ArrayList<String>();
+ expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
+ verifyContainerLog(4, expectedContent, false, "");
+ }
+
+ @Test(timeout=90000)
public void testDSShellWithInvalidArgs() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApp.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApp.java Mon Dec 2 17:41:44 2013
@@ -83,11 +83,13 @@ public abstract class WebApp extends Ser
* @return InetSocketAddress
*/
public InetSocketAddress getListenerAddress() {
- return checkNotNull(httpServer, "httpServer").getListenerAddress();
+ return checkNotNull(httpServer, "httpServer").getConnectorAddress(0);
}
public int port() {
- return checkNotNull(httpServer, "httpServer").getPort();
+ InetSocketAddress addr = checkNotNull(httpServer, "httpServer")
+ .getConnectorAddress(0);
+ return addr == null ? -1 : addr.getPort();
}
public void stop() {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java Mon Dec 2 17:41:44 2013
@@ -22,6 +22,7 @@ import static com.google.common.base.Pre
import java.io.IOException;
import java.net.ConnectException;
+import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
@@ -36,7 +37,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AdminACLsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -216,46 +216,34 @@ public class WebApps {
System.exit(1);
}
}
- HttpServer server =
- new HttpServer(name, bindAddress, port, findPort, conf,
- new AdminACLsManager(conf).getAdminAcl(), null,
- pathList.toArray(new String[0])) {
-
- {
- if (UserGroupInformation.isSecurityEnabled()) {
- boolean initSpnego = true;
- if (spnegoPrincipalKey == null
- || conf.get(spnegoPrincipalKey, "").isEmpty()) {
- LOG.warn("Principal for spnego filter is not set");
- initSpnego = false;
- }
- if (spnegoKeytabKey == null
- || conf.get(spnegoKeytabKey, "").isEmpty()) {
- LOG.warn("Keytab for spnego filter is not set");
- initSpnego = false;
- }
- if (initSpnego) {
- LOG.info("Initializing spnego filter with principal key : "
- + spnegoPrincipalKey + " keytab key : "
- + spnegoKeytabKey);
- initSpnego(conf, spnegoPrincipalKey, spnegoKeytabKey);
- }
- }
- }
- };
+ HttpServer.Builder builder = new HttpServer.Builder().setName(name)
+ .addEndpoint(URI.create("http://" + bindAddress + ":" + port))
+ .setConf(conf).setFindPort(findPort)
+ .setACL(new AdminACLsManager(conf).getAdminAcl())
+ .setPathSpec(pathList.toArray(new String[0]));
+
+ boolean hasSpnegoConf = spnegoPrincipalKey != null
+ && spnegoKeytabKey != null;
+ if (hasSpnegoConf) {
+ builder.setUsernameConfKey(conf.get(spnegoPrincipalKey))
+ .setKeytabConfKey(conf.get(spnegoKeytabKey))
+ .setSecurityEnabled(UserGroupInformation.isSecurityEnabled());
+ }
+ HttpServer server = builder.build();
+
for(ServletStruct struct: servlets) {
server.addServlet(struct.name, struct.spec, struct.clazz);
}
for(Map.Entry<String, Object> entry : attributes.entrySet()) {
server.setAttribute(entry.getKey(), entry.getValue());
}
- server.defineFilter(server.getWebAppContext(), "guice",
+ HttpServer.defineFilter(server.getWebAppContext(), "guice",
GuiceFilter.class.getName(), null, new String[] { "/*" });
webapp.setConf(conf);
webapp.setHttpServer(server);
server.start();
- LOG.info("Web app /"+ name +" started at "+ server.getPort());
+ LOG.info("Web app /"+ name +" started at "+ server.getConnectorAddress(0).getPort());
} catch (ClassNotFoundException e) {
throw new WebAppException("Error starting http server", e);
} catch (IOException e) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/TestWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/TestWebApp.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/TestWebApp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/TestWebApp.java Mon Dec 2 17:41:44 2013
@@ -33,17 +33,6 @@ import static org.junit.Assert.assertTru
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.ext.ContextResolver;
-import javax.ws.rs.ext.Provider;
-import javax.xml.bind.JAXBContext;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.yarn.MockApps;
@@ -55,9 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.api.json.JSONJAXBContext;
public class TestWebApp {
static final Logger LOG = LoggerFactory.getLogger(TestWebApp.class);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Mon Dec 2 17:41:44 2013
@@ -69,7 +69,7 @@ public class WebServer extends AbstractS
.withHttpSpnegoKeytabKey(
YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
.start(this.nmWebApp);
- this.port = this.webApp.httpServer().getPort();
+ this.port = this.webApp.httpServer().getConnectorAddress(0).getPort();
} catch (Exception e) {
String msg = "NMWebapps failed to start.";
LOG.error(msg, e);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon Dec 2 17:41:44 2013
@@ -457,6 +457,7 @@ public class ResourceManager extends Com
if(recoveryEnabled) {
try {
+ rmStore.checkVersion();
RMState state = rmStore.loadState();
recover(state);
} catch (Exception e) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Mon Dec 2 17:41:44 2013
@@ -44,9 +44,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +66,9 @@ public class FileSystemRMStateStore exte
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
- private static final String ROOT_DIR_NAME = "FSRMStateRoot";
+ protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
+ protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
+ .newInstance(1, 0);
protected FileSystem fs;
@@ -78,7 +83,6 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void initInternal(Configuration conf)
throws Exception{
-
fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
@@ -101,6 +105,36 @@ public class FileSystemRMStateStore exte
}
@Override
+ protected RMStateVersion getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ @Override
+ protected synchronized RMStateVersion loadVersion() throws Exception {
+ Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
+ if (fs.exists(versionNodePath)) {
+ FileStatus status = fs.getFileStatus(versionNodePath);
+ byte[] data = readFile(versionNodePath, status.getLen());
+ RMStateVersion version =
+ new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+ return version;
+ }
+ return null;
+ }
+
+ @Override
+ protected synchronized void storeVersion() throws Exception {
+ Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
+ byte[] data =
+ ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+ if (fs.exists(versionNodePath)) {
+ updateFile(versionNodePath, data);
+ } else {
+ writeFile(versionNodePath, data);
+ }
+ }
+
+ @Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
@@ -430,7 +464,7 @@ public class FileSystemRMStateStore exte
fs.rename(tempPath, outputPath);
}
- private void updateFile(Path outputPath, byte[] data) throws Exception {
+ protected void updateFile(Path outputPath, byte[] data) throws Exception {
if (fs.exists(outputPath)) {
deleteFile(outputPath);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Mon Dec 2 17:41:44 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -43,12 +44,15 @@ import com.google.common.annotations.Vis
public class MemoryRMStateStore extends RMStateStore {
RMState state = new RMState();
-
@VisibleForTesting
public RMState getState() {
return state;
}
-
+
+ @Override
+ public void checkVersion() throws Exception {
+ }
+
@Override
public synchronized RMState loadState() throws Exception {
// return a copy of the state to allow for modification of the real state
@@ -224,4 +228,18 @@ public class MemoryRMStateStore extends
state.rmSecretManagerState.getMasterKeyState();
rmDTMasterKeyState.remove(delegationKey);
}
+
+ @Override
+ protected RMStateVersion loadVersion() throws Exception {
+ return null;
+ }
+
+ @Override
+ protected void storeVersion() throws Exception {
+ }
+
+ @Override
+ protected RMStateVersion getCurrentVersion() {
+ return null;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Mon Dec 2 17:41:44 2013
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -99,6 +100,27 @@ public class NullRMStateStore extends RM
@Override
protected void updateApplicationAttemptStateInternal(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ }
+
+ @Override
+ public void checkVersion() throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected RMStateVersion loadVersion() throws Exception {
+ // Do nothing
+ return null;
+ }
+
+ @Override
+ protected void storeVersion() throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected RMStateVersion getCurrentVersion() {
// Do nothing
+ return null;
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Dec 2 17:41:44 2013
@@ -43,18 +43,18 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
-
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -78,6 +78,7 @@ public abstract class RMStateStore exten
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
+ protected static final String VERSION_NODE = "RMVersionNode";
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
@@ -304,7 +305,54 @@ public abstract class RMStateStore exten
* after this
*/
protected abstract void closeInternal() throws Exception;
-
+
+ /**
+ * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+ * 2) Any incompatible change of state-store is a major upgrade, and any
+ * compatible change of state-store is a minor upgrade.
+ * 3) If theres's no version, treat it as 1.0.
+ * 4) Within a minor upgrade, say 1.1 to 1.2:
+ * overwrite the version info and proceed as normal.
+ * 5) Within a major upgrade, say 1.2 to 2.0:
+ * throw exception and indicate user to use a separate upgrade tool to
+ * upgrade RM state.
+ */
+ public void checkVersion() throws Exception {
+ RMStateVersion loadedVersion = loadVersion();
+ LOG.info("Loaded RM state version info " + loadedVersion);
+ if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+ return;
+ }
+ // if there is no version info, treat it as 1.0;
+ if (loadedVersion == null) {
+ loadedVersion = RMStateVersion.newInstance(1, 0);
+ }
+ if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+ LOG.info("Storing RM state version info " + getCurrentVersion());
+ storeVersion();
+ } else {
+ throw new RMStateVersionIncompatibleException(
+ "Expecting RM state version " + getCurrentVersion()
+ + ", but loading version " + loadedVersion);
+ }
+ }
+
+ /**
+ * Derived class use this method to load the version information from state
+ * store.
+ */
+ protected abstract RMStateVersion loadVersion() throws Exception;
+
+ /**
+ * Derived class use this method to store the version information.
+ */
+ protected abstract void storeVersion() throws Exception;
+
+ /**
+ * Get the current version of the underlying state store.
+ */
+ protected abstract RMStateVersion getCurrentVersion();
+
/**
* Blocking API
* The derived class must recover state from the store and return a new
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Mon Dec 2 17:41:44 2013
@@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -41,16 +40,18 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.RMHAServiceTarget;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -64,9 +65,9 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
@Private
@Unstable
@@ -74,7 +75,9 @@ public class ZKRMStateStore extends RMSt
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
- private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+ protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+ protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
+ .newInstance(1, 0);
private int numRetries;
private String zkHostPort = null;
@@ -302,6 +305,36 @@ public class ZKRMStateStore extends RMSt
}
@Override
+ protected RMStateVersion getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ @Override
+ protected synchronized void storeVersion() throws Exception {
+ String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
+ byte[] data =
+ ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+ if (zkClient.exists(versionNodePath, true) != null) {
+ setDataWithRetries(versionNodePath, data, -1);
+ } else {
+ createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
+ }
+ }
+
+ @Override
+ protected synchronized RMStateVersion loadVersion() throws Exception {
+ String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
+
+ if (zkClient.exists(versionNodePath, true) != null) {
+ byte[] data = getDataWithRetries(versionNodePath, true);
+ RMStateVersion version =
+ new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+ return version;
+ }
+ return null;
+ }
+
+ @Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Dec 2 17:41:44 2013
@@ -130,7 +130,7 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
- RMAppEventType.START, new RMAppSavingTransition())
+ RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
@@ -215,7 +215,8 @@ public class RMAppImpl implements RMApp,
new AttemptFinishedAtFinalSavingTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL))
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
+ RMAppEventType.APP_NEW_SAVED))
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@@ -760,7 +761,7 @@ public class RMAppImpl implements RMApp,
return msg;
}
- private static final class RMAppSavingTransition extends RMAppTransition {
+ private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Dec 2 17:41:44 2013
@@ -334,6 +334,7 @@ public class RMAppAttemptImpl implements
// Saving in scheduler
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.KILL))
// Transitions from FAILED State
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Dec 2 17:41:44 2013
@@ -51,7 +51,6 @@ public class AppSchedulable extends Sche
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
- private boolean runnable = false; // everyone starts as not runnable
private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
@@ -61,7 +60,7 @@ public class AppSchedulable extends Sche
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
- this.startTime = System.currentTimeMillis();
+ this.startTime = scheduler.getClock().getTime();
this.queue = queue;
this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager();
@@ -139,18 +138,6 @@ public class AppSchedulable extends Sche
}
/**
- * Is this application runnable? Runnable means that the user and queue
- * application counts are within configured quotas.
- */
- public boolean getRunnable() {
- return runnable;
- }
-
- public void setRunnable(boolean runnable) {
- this.runnable = runnable;
- }
-
- /**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* priority.
@@ -281,9 +268,6 @@ public class AppSchedulable extends Sche
unreserve(priority, node);
return Resources.none();
}
- } else {
- // If this app is over quota, don't schedule anything
- if (!(getRunnable())) { return Resources.none(); }
}
Collection<Priority> prioritiesToTry = (reserved) ?
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Mon Dec 2 17:41:44 2013
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -42,7 +41,9 @@ public class FSLeafQueue extends FSQueue
private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName());
- private final List<AppSchedulable> appScheds =
+ private final List<AppSchedulable> runnableAppScheds = // apps that are runnable
+ new ArrayList<AppSchedulable>();
+ private final List<AppSchedulable> nonRunnableAppScheds =
new ArrayList<AppSchedulable>();
private final FairScheduler scheduler;
@@ -62,29 +63,51 @@ public class FSLeafQueue extends FSQueue
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
}
- public void addApp(FSSchedulerApp app) {
+ public void addApp(FSSchedulerApp app, boolean runnable) {
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
- appScheds.add(appSchedulable);
+ if (runnable) {
+ runnableAppScheds.add(appSchedulable);
+ } else {
+ nonRunnableAppScheds.add(appSchedulable);
+ }
}
// for testing
void addAppSchedulable(AppSchedulable appSched) {
- appScheds.add(appSched);
+ runnableAppScheds.add(appSched);
}
- public void removeApp(FSSchedulerApp app) {
- for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
- AppSchedulable appSched = it.next();
- if (appSched.getApp() == app) {
- it.remove();
- break;
- }
+ /**
+ * Removes the given app from this queue.
+ * @return whether or not the app was runnable
+ */
+ public boolean removeApp(FSSchedulerApp app) {
+ if (runnableAppScheds.remove(app.getAppSchedulable())) {
+ return true;
+ } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
+ return false;
+ } else {
+ throw new IllegalStateException("Given app to remove " + app +
+ " does not exist in queue " + this);
}
}
- public Collection<AppSchedulable> getAppSchedulables() {
- return appScheds;
+ public void makeAppRunnable(AppSchedulable appSched) {
+ if (!nonRunnableAppScheds.remove(appSched)) {
+ throw new IllegalStateException("Can't make app runnable that does not " +
+ "already exist in queue as non-runnable" + appSched);
+ }
+
+ runnableAppScheds.add(appSched);
+ }
+
+ public Collection<AppSchedulable> getRunnableAppSchedulables() {
+ return runnableAppScheds;
+ }
+
+ public List<AppSchedulable> getNonRunnableAppSchedulables() {
+ return nonRunnableAppScheds;
}
@Override
@@ -98,7 +121,7 @@ public class FSLeafQueue extends FSQueue
@Override
public void recomputeShares() {
- policy.computeShares(getAppSchedulables(), getFairShare());
+ policy.computeShares(getRunnableAppSchedulables(), getFairShare());
}
@Override
@@ -109,7 +132,10 @@ public class FSLeafQueue extends FSQueue
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
- for (AppSchedulable app : appScheds) {
+ for (AppSchedulable app : runnableAppScheds) {
+ Resources.addTo(usage, app.getResourceUsage());
+ }
+ for (AppSchedulable app : nonRunnableAppScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
@@ -121,25 +147,35 @@ public class FSLeafQueue extends FSQueue
// Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0);
- for (AppSchedulable sched : appScheds) {
- sched.updateDemand();
- Resource toAdd = sched.getDemand();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
- + "; Total resource consumption for " + getName() + " now "
- + demand);
+ for (AppSchedulable sched : runnableAppScheds) {
+ if (Resources.equals(demand, maxRes)) {
+ break;
}
- demand = Resources.add(demand, toAdd);
- demand = Resources.componentwiseMin(demand, maxRes);
+ updateDemandForApp(sched, maxRes);
+ }
+ for (AppSchedulable sched : nonRunnableAppScheds) {
if (Resources.equals(demand, maxRes)) {
break;
}
+ updateDemandForApp(sched, maxRes);
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxRes);
}
}
+
+ private void updateDemandForApp(AppSchedulable sched, Resource maxRes) {
+ sched.updateDemand();
+ Resource toAdd = sched.getDemand();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
+ + "; Total resource consumption for " + getName() + " now "
+ + demand);
+ }
+ demand = Resources.add(demand, toAdd);
+ demand = Resources.componentwiseMin(demand, maxRes);
+ }
@Override
public Resource assignContainer(FSSchedulerNode node) {
@@ -153,17 +189,15 @@ public class FSLeafQueue extends FSQueue
}
Comparator<Schedulable> comparator = policy.getComparator();
- Collections.sort(appScheds, comparator);
- for (AppSchedulable sched : appScheds) {
- if (sched.getRunnable()) {
- if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
- continue;
- }
-
- assigned = sched.assignContainer(node);
- if (!assigned.equals(Resources.none())) {
- break;
- }
+ Collections.sort(runnableAppScheds, comparator);
+ for (AppSchedulable sched : runnableAppScheds) {
+ if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
+ continue;
+ }
+
+ assigned = sched.assignContainer(node);
+ if (!assigned.equals(Resources.none())) {
+ break;
}
}
return assigned;
@@ -205,4 +239,9 @@ public class FSLeafQueue extends FSQueue
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
+
+ @Override
+ public int getNumRunnableApps() {
+ return runnableAppScheds.size();
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Mon Dec 2 17:41:44 2013
@@ -43,6 +43,7 @@ public class FSParentQueue extends FSQue
new ArrayList<FSQueue>();
private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
+ private int runnableApps;
public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
FSParentQueue parent) {
@@ -171,4 +172,17 @@ public class FSParentQueue extends FSQue
}
super.policy = policy;
}
+
+ public void incrementRunnableApps() {
+ runnableApps++;
+ }
+
+ public void decrementRunnableApps() {
+ runnableApps--;
+ }
+
+ @Override
+ public int getNumRunnableApps() {
+ return runnableApps;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Mon Dec 2 17:41:44 2013
@@ -72,6 +72,10 @@ public abstract class FSQueue extends Sc
public SchedulingPolicy getPolicy() {
return policy;
}
+
+ public FSParentQueue getParent() {
+ return parent;
+ }
protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
throws AllocationConfigurationException {
@@ -165,6 +169,12 @@ public abstract class FSQueue extends Sc
public abstract Collection<FSQueue> getChildQueues();
/**
+ * Return the number of apps for which containers can be allocated.
+ * Includes apps in subqueues.
+ */
+ public abstract int getNumRunnableApps();
+
+ /**
* Helper method to check if the queue should attempt assigning resources
*
* @return true if check passes (can assign) or false otherwise
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Mon Dec 2 17:41:44 2013
@@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -62,7 +61,7 @@ public class FSSchedulerApp extends Sche
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
+ String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
}
@@ -327,4 +326,9 @@ public class FSSchedulerApp extends Sche
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
+
+ @Override
+ public FSLeafQueue getQueue() {
+ return (FSLeafQueue)super.getQueue();
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Dec 2 17:41:44 2013
@@ -190,9 +190,13 @@ public class FairScheduler implements Re
// heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
+ @VisibleForTesting
+ final MaxRunningAppsEnforcer maxRunningEnforcer;
+
public FairScheduler() {
clock = new SystemClock();
queueMgr = new QueueManager(this);
+ maxRunningEnforcer = new MaxRunningAppsEnforcer(queueMgr);
}
private void validateConf(Configuration conf) {
@@ -272,7 +276,6 @@ public class FairScheduler implements Re
*/
protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
- updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@@ -377,7 +380,7 @@ public class FairScheduler implements Re
for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
sched.getResourceUsage(), sched.getFairShare())) {
- for (AppSchedulable as : sched.getAppSchedulables()) {
+ for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
@@ -505,63 +508,23 @@ public class FairScheduler implements Re
return resToPreempt;
}
- /**
- * This updates the runnability of all apps based on whether or not any
- * users/queues have exceeded their capacity.
- */
- private void updateRunnability() {
- List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
-
- // Start by marking everything as not runnable
- for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
- for (AppSchedulable a : leafQueue.getAppSchedulables()) {
- a.setRunnable(false);
- apps.add(a);
- }
- }
- // Create a list of sorted jobs in order of start time and priority
- Collections.sort(apps, new FifoAppComparator());
- // Mark jobs as runnable in order of start time and priority, until
- // user or queue limits have been reached.
- Map<String, Integer> userApps = new HashMap<String, Integer>();
- Map<String, Integer> queueApps = new HashMap<String, Integer>();
-
- for (AppSchedulable app : apps) {
- String user = app.getApp().getUser();
- String queue = app.getApp().getQueueName();
- int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
- int queueCount = queueApps.containsKey(queue) ? queueApps.get(queue) : 0;
- if (userCount < queueMgr.getUserMaxApps(user) &&
- queueCount < queueMgr.getQueueMaxApps(queue)) {
- userApps.put(user, userCount + 1);
- queueApps.put(queue, queueCount + 1);
- app.setRunnable(true);
- }
- }
- }
-
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
// synchronized for sizeBasedWeight
public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
- if (!app.getRunnable()) {
- // Job won't launch tasks, but don't return 0 to avoid division errors
- return ResourceWeights.NEUTRAL;
- } else {
- double weight = 1.0;
- if (sizeBasedWeight) {
- // Set weight based on current memory demand
- weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
- }
- weight *= app.getPriority().getPriority();
- if (weightAdjuster != null) {
- // Run weight through the user-supplied weightAdjuster
- weight = weightAdjuster.adjustWeight(app, weight);
- }
- return new ResourceWeights((float)weight);
+ double weight = 1.0;
+ if (sizeBasedWeight) {
+ // Set weight based on current memory demand
+ weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
+ }
+ weight *= app.getPriority().getPriority();
+ if (weightAdjuster != null) {
+ // Run weight through the user-supplied weightAdjuster
+ weight = weightAdjuster.adjustWeight(app, weight);
}
+ return new ResourceWeights((float)weight);
}
@Override
@@ -662,7 +625,14 @@ public class FairScheduler implements Re
return;
}
- queue.addApp(schedulerApp);
+ boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ queue.addApp(schedulerApp, runnable);
+ if (runnable) {
+ maxRunningEnforcer.trackRunnableApp(schedulerApp);
+ } else {
+ maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
+ }
+
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
@@ -736,8 +706,14 @@ public class FairScheduler implements Re
// Inform the queue
FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
.getQueueName(), false);
- queue.removeApp(application);
+ boolean wasRunnable = queue.removeApp(application);
+ if (wasRunnable) {
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
+ } else {
+ maxRunningEnforcer.untrackNonRunnableApp(application);
+ }
+
// Remove from our data-structure
applications.remove(applicationAttemptId);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Mon Dec 2 17:41:44 2013
@@ -89,7 +89,8 @@ public class QueueManager {
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
- private volatile QueueManagerInfo info = new QueueManagerInfo();
+ @VisibleForTesting
+ volatile QueueManagerInfo info = new QueueManagerInfo();
@VisibleForTesting
volatile QueuePlacementPolicy placementPolicy;
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Mon Dec 2 17:41:44 2013
@@ -39,6 +39,8 @@ public class QueuePlacementPolicy {
new HashMap<String, Class<? extends QueuePlacementRule>>();
map.put("user", QueuePlacementRule.User.class);
map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+ map.put("secondaryGroupExistingQueue",
+ QueuePlacementRule.SecondaryGroupExistingQueue.class);
map.put("specified", QueuePlacementRule.Specified.class);
map.put("default", QueuePlacementRule.Default.class);
map.put("reject", QueuePlacementRule.Reject.class);