You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by su...@apache.org on 2013/01/24 23:44:12 UTC
svn commit: r1438243 - in
/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/...
Author: suresh
Date: Thu Jan 24 22:44:10 2013
New Revision: 1438243
URL: http://svn.apache.org/viewvc?rev=1438243&view=rev
Log:
Merge changes from trunk
Added:
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
- copied unchanged from r1438240, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
- copied from r1438240, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
- copied unchanged from r1438240, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServer.java
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/CHANGES.txt?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/CHANGES.txt Thu Jan 24 22:44:10 2013
@@ -37,6 +37,9 @@ Release 2.0.3-alpha - Unreleased
YARN-328. Use token request messages defined in hadoop common. (suresh)
+ YARN-231. RM Restart - Add FS-based persistent store implementation for
+ RMStateStore (Bikas Saha via hitesh)
+
IMPROVEMENTS
YARN-223. Update process tree instead of getting new process trees.
@@ -101,6 +104,9 @@ Release 2.0.3-alpha - Unreleased
YARN-331. Fill in missing fair scheduler documentation. (sandyr via tucu)
+ YARN-277. Use AMRMClient in DistributedShell to exemplify the approach.
+ (Bikas Saha via hitesh)
+
OPTIMIZATIONS
BUG FIXES
@@ -202,6 +208,10 @@ Release 2.0.3-alpha - Unreleased
unregistered on App-finish. (vinodkv via sseth)
YARN-302. Fair scheduler assignmultiple should default to false. (sandyr via tucu)
+
+ YARN-319. Submitting a job to a fair scheduler queue for which the user
+ does not have permission causes the client to wait forever.
+ (shenhong via tomwhite)
Release 2.0.2-alpha - 2012-09-07
@@ -325,6 +335,9 @@ Release 0.23.6 - UNRELEASED
YARN-334. Maven RAT plugin is not checking all source files (tgraves)
+ YARN-354. WebAppProxyServer exits immediately after startup (Liang Xie via
+ jlowe)
+
Release 0.23.5 - 2012-11-28
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java Thu Jan 24 22:44:10 2013
@@ -38,6 +38,8 @@ import org.apache.hadoop.classification.
public abstract class ApplicationAttemptId implements
Comparable<ApplicationAttemptId> {
+ public static final String appAttemptIdStrPrefix = "appattempt_";
+
/**
* Get the <code>ApplicationId</code> of the <code>ApplicationAttempId</code>.
* @return <code>ApplicationId</code> of the <code>ApplicationAttempId</code>
@@ -111,11 +113,11 @@ public abstract class ApplicationAttempt
@Override
public String toString() {
- StringBuilder sb = new StringBuilder("appattempt_");
+ StringBuilder sb = new StringBuilder(appAttemptIdStrPrefix);
sb.append(this.getApplicationId().getClusterTimestamp()).append("_");
sb.append(ApplicationId.appIdFormat.get().format(
this.getApplicationId().getId()));
sb.append("_").append(attemptIdFormat.get().format(getAttemptId()));
return sb.toString();
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java Thu Jan 24 22:44:10 2013
@@ -38,6 +38,8 @@ import org.apache.hadoop.classification.
@Stable
public abstract class ApplicationId implements Comparable<ApplicationId> {
+ public static final String appIdStrPrefix = "application_";
+
/**
* Get the short integer identifier of the <code>ApplicationId</code>
* which is unique for all applications started by a particular instance
@@ -88,7 +90,7 @@ public abstract class ApplicationId impl
@Override
public String toString() {
- return "application_" + this.getClusterTimestamp() + "_"
+ return appIdStrPrefix + this.getClusterTimestamp() + "_"
+ appIdFormat.get().format(getId());
}
@@ -119,4 +121,4 @@ public abstract class ApplicationId impl
return false;
return true;
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Thu Jan 24 22:44:10 2013
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
@@ -51,9 +50,6 @@ import org.apache.hadoop.yarn.api.Contai
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -71,6 +67,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -78,37 +77,64 @@ import org.apache.hadoop.yarn.util.Conve
import org.apache.hadoop.yarn.util.Records;
/**
- * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework.
+ * An ApplicationMaster for executing shell commands on a set of launched
+ * containers using the YARN framework.
*
- * <p>This class is meant to act as an example on how to write yarn-based application masters. </p>
+ * <p>
+ * This class is meant to act as an example on how to write yarn-based
+ * application masters.
+ * </p>
*
- * <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher.
- * The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with
- * the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code>
- * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client
- * as well as a tracking url that a client can use to keep track of status/job history if needed. </p>
+ * <p>
+ * The ApplicationMaster is started on a container by the
+ * <code>ResourceManager</code>'s launcher. The first thing that the
+ * <code>ApplicationMaster</code> needs to do is to connect and register itself
+ * with the <code>ResourceManager</code>. The registration sets up information
+ * within the <code>ResourceManager</code> regarding what host:port the
+ * ApplicationMaster is listening on to provide any form of functionality to a
+ * client as well as a tracking url that a client can use to keep track of
+ * status/job history if needed.
+ * </p>
*
- * <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals
- * to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the
- * <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat.
+ * <p>
+ * The <code>ApplicationMaster</code> needs to send a heartbeat to the
+ * <code>ResourceManager</code> at regular intervals to inform the
+ * <code>ResourceManager</code> that it is up and alive. The
+ * {@link AMRMProtocol#allocate} to the <code>ResourceManager</code> from the
+ * <code>ApplicationMaster</code> acts as a heartbeat.
*
- * <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request the
- * <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest}
- * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements.
- * The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code>
- * of the set of newly allocated containers, completed containers as well as current state of available resources. </p>
+ * <p>
+ * For the actual handling of the job, the <code>ApplicationMaster</code> has to
+ * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
+ * required no. of containers using {@link ResourceRequest} with the necessary
+ * resource specifications such as node location, computational
+ * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
+ * responds with an {@link AllocateResponse} that informs the
+ * <code>ApplicationMaster</code> of the set of newly allocated containers,
+ * completed containers as well as current state of available resources.
+ * </p>
*
- * <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via
- * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable,
- * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest}
- * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p>
- *
- * <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code>
- * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager}
- * by querying for the status of the allocated container's {@link ContainerId}.
+ * <p>
+ * For each allocated container, the <code>ApplicationMaster</code> can then set
+ * up the necessary launch context via {@link ContainerLaunchContext} to specify
+ * the allocated container id, local resources required by the executable, the
+ * environment to be setup for the executable, commands to execute, etc. and
+ * submit a {@link StartContainerRequest} to the {@link ContainerManager} to
+ * launch and execute the defined commands on the given allocated container.
+ * </p>
*
- * <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest}
- * to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed.
+ * <p>
+ * The <code>ApplicationMaster</code> can monitor the launched container by
+ * either querying the <code>ResourceManager</code> using
+ * {@link AMRMProtocol#allocate} to get updates on completed containers or via
+ * the {@link ContainerManager} by querying for the status of the allocated
+ * container's {@link ContainerId}.
+ *
+ * <p>
+ * After the job has been completed, the <code>ApplicationMaster</code> has to
+ * send a {@link FinishApplicationMasterRequest} to the
+ * <code>ResourceManager</code> to inform it that the
+ * <code>ApplicationMaster</code> has been completed.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@@ -116,61 +142,58 @@ public class ApplicationMaster {
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
- // Configuration
+ // Configuration
private Configuration conf;
// YARN RPC to communicate with the Resource Manager or Node Manager
private YarnRPC rpc;
// Handle to communicate with the Resource Manager
- private AMRMProtocol resourceManager;
+ private AMRMClient resourceManager;
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
// TODO
// For status update for clients - yet to be implemented
- // Hostname of the container
+ // Hostname of the container
private String appMasterHostname = "";
- // Port on which the app master listens for status update requests from clients
+ // Port on which the app master listens for status updates from clients
private int appMasterRpcPort = 0;
- // Tracking url to which app master publishes info for clients to monitor
+ // Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
// App Master configuration
// No. of containers to run shell command on
private int numTotalContainers = 1;
- // Memory to request for the container on which the shell command will run
+ // Memory to request for the container on which the shell command will run
private int containerMemory = 10;
// Priority of the request
- private int requestPriority;
-
- // Incremental counter for rpc calls to the RM
- private AtomicInteger rmRequestID = new AtomicInteger();
+ private int requestPriority;
// Simple flag to denote whether all works is done
- private boolean appDone = false;
+ private boolean appDone = false;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
// allocated to us
private AtomicInteger numAllocatedContainers = new AtomicInteger();
- // Count of failed containers
+ // Count of failed containers
private AtomicInteger numFailedContainers = new AtomicInteger();
// Count of containers already requested from the RM
- // Needed as once requested, we should not request for containers again and again.
- // Only request for more if the original requirement changes.
+ // Needed as once requested, we should not request for containers again.
+ // Only request for more if the original requirement changes.
private AtomicInteger numRequestedContainers = new AtomicInteger();
- // Shell command to be executed
- private String shellCommand = "";
+ // Shell command to be executed
+ private String shellCommand = "";
// Args to be passed to the shell command
private String shellArgs = "";
- // Env variables to be setup for the shell command
+ // Env variables to be setup for the shell command
private Map<String, String> shellEnv = new HashMap<String, String>();
// Location of shell script ( obtained from info set in env )
// Shell script path in fs
- private String shellScriptPath = "";
+ private String shellScriptPath = "";
// Timestamp needed for creating a local resource
private long shellScriptPathTimestamp = 0;
// File length needed for local resource
@@ -179,9 +202,6 @@ public class ApplicationMaster {
// Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh";
- // Containers to be released
- private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
-
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
@@ -205,8 +225,7 @@ public class ApplicationMaster {
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
- }
- else {
+ } else {
LOG.info("Application Master failed. exiting");
System.exit(2);
}
@@ -221,7 +240,8 @@ public class ApplicationMaster {
Map<String, String> envs = System.getenv();
for (Map.Entry<String, String> env : envs.entrySet()) {
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
- System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
+ System.out.println("System env: key=" + env.getKey() + ", val="
+ + env.getValue());
}
String cmd = "ls -al";
@@ -231,9 +251,10 @@ public class ApplicationMaster {
pr = run.exec(cmd);
pr.waitFor();
- BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+ BufferedReader buf = new BufferedReader(new InputStreamReader(
+ pr.getInputStream()));
String line = "";
- while ((line=buf.readLine())!=null) {
+ while ((line = buf.readLine()) != null) {
LOG.info("System CWD content: " + line);
System.out.println("System CWD content: " + line);
}
@@ -242,31 +263,39 @@ public class ApplicationMaster {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
- }
+ }
}
public ApplicationMaster() throws Exception {
// Set up the configuration and RPC
- conf = new Configuration();
+ conf = new YarnConfiguration();
rpc = YarnRPC.create(conf);
}
+
/**
* Parse command line options
- * @param args Command line args
- * @return Whether init successful and run should be invoked
+ *
+ * @param args Command line args
+ * @return Whether init successful and run should be invoked
* @throws ParseException
- * @throws IOException
+ * @throws IOException
*/
public boolean init(String[] args) throws ParseException, IOException {
Options opts = new Options();
- opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
- opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
- opts.addOption("shell_script", true, "Location of the shell script to be executed");
+ opts.addOption("app_attempt_id", true,
+ "App Attempt ID. Not to be used unless for testing purposes");
+ opts.addOption("shell_command", true,
+ "Shell command to be executed by the Application Master");
+ opts.addOption("shell_script", true,
+ "Location of the shell script to be executed");
opts.addOption("shell_args", true, "Command line args for the shell script");
- opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
- opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
- opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
+ opts.addOption("shell_env", true,
+ "Environment for shell script. Specified as env_key=env_val pairs");
+ opts.addOption("container_memory", true,
+ "Amount of memory in MB to be requested to run the shell command");
+ opts.addOption("num_containers", true,
+ "No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("debug", false, "Dump out debug information");
@@ -275,7 +304,8 @@ public class ApplicationMaster {
if (args.length == 0) {
printUsage(opts);
- throw new IllegalArgumentException("No args specified for application master to initialize");
+ throw new IllegalArgumentException(
+ "No args specified for application master to initialize");
}
if (cliParser.hasOption("help")) {
@@ -289,7 +319,6 @@ public class ApplicationMaster {
Map<String, String> envs = System.getenv();
- appAttemptID = Records.newRecord(ApplicationAttemptId.class);
if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
appAttemptID = ConverterUtils.toApplicationAttemptId(envs
.get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
@@ -297,29 +326,31 @@ public class ApplicationMaster {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
- }
- else {
- throw new IllegalArgumentException("Application Attempt Id not set in the environment");
+ } else {
+ throw new IllegalArgumentException(
+ "Application Attempt Id not set in the environment");
}
} else {
- ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
+ ContainerId containerId = ConverterUtils.toContainerId(envs
+ .get(ApplicationConstants.AM_CONTAINER_ID_ENV));
appAttemptID = containerId.getApplicationAttemptId();
}
- LOG.info("Application master for app"
- + ", appId=" + appAttemptID.getApplicationId().getId()
- + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp()
+ LOG.info("Application master for app" + ", appId="
+ + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+ + appAttemptID.getApplicationId().getClusterTimestamp()
+ ", attemptId=" + appAttemptID.getAttemptId());
if (!cliParser.hasOption("shell_command")) {
- throw new IllegalArgumentException("No shell command specified to be executed by application master");
+ throw new IllegalArgumentException(
+ "No shell command specified to be executed by application master");
}
shellCommand = cliParser.getOptionValue("shell_command");
if (cliParser.hasOption("shell_args")) {
shellArgs = cliParser.getOptionValue("shell_args");
}
- if (cliParser.hasOption("shell_env")) {
+ if (cliParser.hasOption("shell_env")) {
String shellEnvs[] = cliParser.getOptionValues("shell_env");
for (String env : shellEnvs) {
env = env.trim();
@@ -330,8 +361,8 @@ public class ApplicationMaster {
}
String key = env.substring(0, index);
String val = "";
- if (index < (env.length()-1)) {
- val = env.substring(index+1);
+ if (index < (env.length() - 1)) {
+ val = env.substring(index + 1);
}
shellEnv.put(key, val);
}
@@ -341,32 +372,37 @@ public class ApplicationMaster {
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
- shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
+ shellScriptPathTimestamp = Long.valueOf(envs
+ .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
- shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
+ shellScriptPathLen = Long.valueOf(envs
+ .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
}
if (!shellScriptPath.isEmpty()
- && (shellScriptPathTimestamp <= 0
- || shellScriptPathLen <= 0)) {
- LOG.error("Illegal values in env for shell script path"
- + ", path=" + shellScriptPath
- + ", len=" + shellScriptPathLen
- + ", timestamp=" + shellScriptPathTimestamp);
- throw new IllegalArgumentException("Illegal values in env for shell script path");
+ && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
+ LOG.error("Illegal values in env for shell script path" + ", path="
+ + shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+ + shellScriptPathTimestamp);
+ throw new IllegalArgumentException(
+ "Illegal values in env for shell script path");
}
}
- containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
- numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
- requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+ containerMemory = Integer.parseInt(cliParser.getOptionValue(
+ "container_memory", "10"));
+ numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
+ "num_containers", "1"));
+ requestPriority = Integer.parseInt(cliParser
+ .getOptionValue("priority", "0"));
return true;
}
/**
- * Helper function to print usage
+ * Helper function to print usage
+ *
* @param opts Parsed command line options
*/
private void printUsage(Options opts) {
@@ -375,228 +411,240 @@ public class ApplicationMaster {
/**
* Main run function for the application master
+ *
* @throws YarnRemoteException
*/
public boolean run() throws YarnRemoteException {
LOG.info("Starting ApplicationMaster");
// Connect to ResourceManager
- resourceManager = connectToRM();
+ resourceManager = new AMRMClientImpl(appAttemptID);
+ resourceManager.init(conf);
+ resourceManager.start();
- // Setup local RPC Server to accept status requests directly from clients
- // TODO need to setup a protocol for client to be able to communicate to the RPC server
- // TODO use the rpc port info to register with the RM for the client to send requests to this app master
-
- // Register self with ResourceManager
- RegisterApplicationMasterResponse response = registerToRM();
- // Dump out information about cluster capability as seen by the resource manager
- int minMem = response.getMinimumResourceCapability().getMemory();
- int maxMem = response.getMaximumResourceCapability().getMemory();
- LOG.info("Min mem capabililty of resources in this cluster " + minMem);
- LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
- // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
- // a multiple of the min value and cannot exceed the max.
- // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
- if (containerMemory < minMem) {
- LOG.info("Container memory specified below min threshold of cluster. Using min value."
- + ", specified=" + containerMemory
- + ", min=" + minMem);
- containerMemory = minMem;
- }
- else if (containerMemory > maxMem) {
- LOG.info("Container memory specified above max threshold of cluster. Using max value."
- + ", specified=" + containerMemory
- + ", max=" + maxMem);
- containerMemory = maxMem;
- }
-
- // Setup heartbeat emitter
- // TODO poll RM every now and then with an empty request to let RM know that we are alive
- // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting:
- // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
- // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter
- // is not required.
-
- // Setup ask for containers from RM
- // Send request for containers to RM
- // Until we get our fully allocated quota, we keep on polling RM for containers
- // Keep looping until all the containers are launched and shell script executed on them
- // ( regardless of success/failure).
-
- int loopCounter = -1;
-
- while (numCompletedContainers.get() < numTotalContainers
- && !appDone) {
- loopCounter++;
-
- // log current state
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone
- + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers
- + ", completed=" + numCompletedContainers
- + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // Sleep before each loop when asking RM for containers
- // to avoid flooding RM with spurious requests when it
- // need not have any available containers
- // Sleeping for 1000 ms.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted " + e.getMessage());
- }
-
- // No. of containers to request
- // For the first loop, askCount will be equal to total containers needed
- // From that point on, askCount will always be 0 as current implementation
- // does not change its ask on container failures.
- int askCount = numTotalContainers - numRequestedContainers.get();
- numRequestedContainers.addAndGet(askCount);
-
- // Setup request to be sent to RM to allocate containers
- List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
- if (askCount > 0) {
- ResourceRequest containerAsk = setupContainerAskForRM(askCount);
- resourceReq.add(containerAsk);
- }
-
- // Send the request to RM
- LOG.info("Asking RM for containers"
- + ", askCount=" + askCount);
- AMResponse amResp =sendContainerAskToRM(resourceReq);
-
- // Retrieve list of allocated containers from the response
- List<Container> allocatedContainers = amResp.getAllocatedContainers();
- LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
- numAllocatedContainers.addAndGet(allocatedContainers.size());
- for (Container allocatedContainer : allocatedContainers) {
- LOG.info("Launching shell command on a new container."
- + ", containerId=" + allocatedContainer.getId()
- + ", containerNode=" + allocatedContainer.getNodeId().getHost()
- + ":" + allocatedContainer.getNodeId().getPort()
- + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
- + ", containerState" + allocatedContainer.getState()
- + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
- //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
-
- LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
- Thread launchThread = new Thread(runnableLaunchContainer);
-
- // launch and start the container on a separate thread to keep the main thread unblocked
- // as all containers may not be allocated at one go.
- launchThreads.add(launchThread);
- launchThread.start();
- }
-
- // Check what the current available resources in the cluster are
- // TODO should we do anything if the available resources are not enough?
- Resource availableResources = amResp.getAvailableResources();
- LOG.info("Current available resources in the cluster " + availableResources);
-
- // Check the completed containers
- List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
- LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
- + ", state=" + containerStatus.getState()
- + ", exitStatus=" + containerStatus.getExitStatus()
- + ", diagnostics=" + containerStatus.getDiagnostics());
-
- // non complete containers should not be here
- assert(containerStatus.getState() == ContainerState.COMPLETE);
-
- // increment counters for completed/failed containers
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- // container failed
- if (-100 != exitStatus) {
- // shell script failed
- // counts as completed
+ try {
+ // Setup local RPC Server to accept status requests directly from clients
+ // TODO need to setup a protocol for client to be able to communicate to
+ // the RPC server
+ // TODO use the rpc port info to register with the RM for the client to
+ // send requests to this app master
+
+ // Register self with ResourceManager
+ RegisterApplicationMasterResponse response = resourceManager
+ .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ appMasterTrackingUrl);
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int minMem = response.getMinimumResourceCapability().getMemory();
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // A resource ask has to be atleast the minimum of the capability of the
+ // cluster, the value has to be a multiple of the min value and cannot
+ // exceed the max.
+ // If it is not an exact multiple of min, the RM will allocate to the
+ // nearest multiple of min
+ if (containerMemory < minMem) {
+ LOG.info("Container memory specified below min threshold of cluster."
+ + " Using min value." + ", specified=" + containerMemory + ", min="
+ + minMem);
+ containerMemory = minMem;
+ } else if (containerMemory > maxMem) {
+ LOG.info("Container memory specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerMemory + ", max="
+ + maxMem);
+ containerMemory = maxMem;
+ }
+
+ // Setup heartbeat emitter
+ // TODO poll RM every now and then with an empty request to let RM know
+ // that we are alive
+ // The heartbeat interval after which an AM is timed out by the RM is
+ // defined by a config setting:
+ // RM_AM_EXPIRY_INTERVAL_MS with default defined by
+ // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
+ // The allocate calls to the RM count as heartbeats so, for now,
+ // this additional heartbeat emitter is not required.
+
+ // Setup ask for containers from RM
+ // Send request for containers to RM
+ // Until we get our fully allocated quota, we keep on polling RM for
+ // containers
+ // Keep looping until all the containers are launched and shell script
+ // executed on them ( regardless of success/failure).
+
+ int loopCounter = -1;
+
+ while (numCompletedContainers.get() < numTotalContainers && !appDone) {
+ loopCounter++;
+
+ // log current state
+ LOG.info("Current application state: loop=" + loopCounter
+ + ", appDone=" + appDone + ", total=" + numTotalContainers
+ + ", requested=" + numRequestedContainers + ", completed="
+ + numCompletedContainers + ", failed=" + numFailedContainers
+ + ", currentAllocated=" + numAllocatedContainers);
+
+ // Sleep before each loop when asking RM for containers
+ // to avoid flooding RM with spurious requests when it
+ // need not have any available containers
+ // Sleeping for 1000 ms.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted " + e.getMessage());
+ }
+
+ // No. of containers to request
+ // For the first loop, askCount will be equal to total containers needed
+ // From that point on, askCount will always be 0 as current
+ // implementation does not change its ask on container failures.
+ int askCount = numTotalContainers - numRequestedContainers.get();
+ numRequestedContainers.addAndGet(askCount);
+
+ if (askCount > 0) {
+ ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+ resourceManager.addContainerRequest(containerAsk);
+ }
+
+ // Send the request to RM
+ LOG.info("Asking RM for containers" + ", askCount=" + askCount);
+ AMResponse amResp = sendContainerAskToRM();
+
+ // Retrieve list of allocated containers from the response
+ List<Container> allocatedContainers = amResp.getAllocatedContainers();
+ LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching shell command on a new container."
+ + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ + ":" + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ + ", containerState" + allocatedContainer.getState()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+ // + ", containerToken"
+ // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+ allocatedContainer);
+ Thread launchThread = new Thread(runnableLaunchContainer);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
+ }
+
+ // Check what the current available resources in the cluster are
+ // TODO should we do anything if the available resources are not enough?
+ Resource availableResources = amResp.getAvailableResources();
+ LOG.info("Current available resources in the cluster "
+ + availableResources);
+
+ // Check the completed containers
+ List<ContainerStatus> completedContainers = amResp
+ .getCompletedContainersStatuses();
+ LOG.info("Got response from RM for container ask, completedCnt="
+ + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus="
+ + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (-100 != exitStatus) {
+ // shell script failed
+ // counts as completed
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ } else {
+ // something else bad happened
+ // app job did not complete for some reason
+ // we should re-try as the container was lost for some reason
+ numAllocatedContainers.decrementAndGet();
+ numRequestedContainers.decrementAndGet();
+ // we do not need to release the container as it would be done
+ // by the RM/CM.
+ }
+ } else {
+ // nothing to do
+ // container completed successfully
numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- }
- else {
- // something else bad happened
- // app job did not complete for some reason
- // we should re-try as the container was lost for some reason
- numAllocatedContainers.decrementAndGet();
- numRequestedContainers.decrementAndGet();
- // we do not need to release the container as it would be done
- // by the RM/CM.
+ LOG.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
}
}
- else {
- // nothing to do
- // container completed successfully
- numCompletedContainers.incrementAndGet();
- LOG.info("Container completed successfully."
- + ", containerId=" + containerStatus.getContainerId());
+ if (numCompletedContainers.get() == numTotalContainers) {
+ appDone = true;
}
- }
- if (numCompletedContainers.get() == numTotalContainers) {
- appDone = true;
+ LOG.info("Current application state: loop=" + loopCounter
+ + ", appDone=" + appDone + ", total=" + numTotalContainers
+ + ", requested=" + numRequestedContainers + ", completed="
+ + numCompletedContainers + ", failed=" + numFailedContainers
+ + ", currentAllocated=" + numAllocatedContainers);
+
+ // TODO
+ // Add a timeout handling layer
+ // for misbehaving shell commands
}
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone
- + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers
- + ", completed=" + numCompletedContainers
- + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // TODO
- // Add a timeout handling layer
- // for misbehaving shell commands
- }
-
- // Join all launched threads
- // needed for when we time out
- // and we need to release containers
- for (Thread launchThread : launchThreads) {
- try {
- launchThread.join(10000);
- } catch (InterruptedException e) {
- LOG.info("Exception thrown in thread join: " + e.getMessage());
- e.printStackTrace();
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
}
- }
- // When the application completes, it should send a finish application signal
- // to the RM
- LOG.info("Application completed. Signalling finish to RM");
-
- FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
- finishReq.setAppAttemptId(appAttemptID);
- boolean isSuccess = true;
- if (numFailedContainers.get() == 0) {
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
- }
- else {
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
- String diagnostics = "Diagnostics."
- + ", total=" + numTotalContainers
- + ", completed=" + numCompletedContainers.get()
- + ", allocated=" + numAllocatedContainers.get()
- + ", failed=" + numFailedContainers.get();
- finishReq.setDiagnostics(diagnostics);
- isSuccess = false;
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ boolean isSuccess = true;
+ if (numFailedContainers.get() == 0) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ + ", completed=" + numCompletedContainers.get() + ", allocated="
+ + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ isSuccess = false;
+ }
+ resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+ return isSuccess;
+ } finally {
+ resourceManager.stop();
}
- resourceManager.finishApplicationMaster(finishReq);
- return isSuccess;
}
/**
- * Thread to connect to the {@link ContainerManager} and
- * launch the container that will execute the shell command.
+ * Thread to connect to the {@link ContainerManager} and launch the container
+ * that will execute the shell command.
*/
private class LaunchContainerRunnable implements Runnable {
- // Allocated container
+ // Allocated container
Container container;
// Handle to communicate with ContainerManager
ContainerManager cm;
@@ -612,15 +660,16 @@ public class ApplicationMaster {
* Helper function to connect to CM
*/
private void connectToCM() {
- LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
+ LOG.debug("Connecting to ContainerManager for containerid="
+ + container.getId());
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
- this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
+ this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class,
+ cmAddress, conf));
}
-
@Override
/**
* Connects to CM, sets up container launch context
@@ -628,11 +677,13 @@ public class ApplicationMaster {
* start request to the CM.
*/
public void run() {
- // Connect to ContainerManager
+ // Connect to ContainerManager
connectToCM();
- LOG.info("Setting up container launch container for containerid=" + container.getId());
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ LOG.info("Setting up container launch container for containerid="
+ + container.getId());
+ ContainerLaunchContext ctx = Records
+ .newRecord(ContainerLaunchContext.class);
ctx.setContainerId(container.getId());
ctx.setResource(container.getResource());
@@ -642,28 +693,30 @@ public class ApplicationMaster {
ctx.setUser(jobUserName);
LOG.info("Setting user in ContainerLaunchContext to: " + jobUserName);
- // Set the environment
+ // Set the environment
ctx.setEnvironment(shellEnv);
- // Set the local resources
+ // Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- // The container for the eventual shell commands needs its own local resources too.
- // In this scenario, if a shell script is specified, we need to have it copied
- // and made available to the container.
+ // The container for the eventual shell commands needs its own local
+ // resources too.
+ // In this scenario, if a shell script is specified, we need to have it
+ // copied and made available to the container.
if (!shellScriptPath.isEmpty()) {
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
shellRsrc.setType(LocalResourceType.FILE);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
- shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
+ shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
+ shellScriptPath)));
} catch (URISyntaxException e) {
- LOG.error("Error when trying to use shell script path specified in env"
- + ", path=" + shellScriptPath);
+ LOG.error("Error when trying to use shell script path specified"
+ + " in env, path=" + shellScriptPath);
e.printStackTrace();
- // A failure scenario on bad input such as invalid shell script path
- // We know we cannot continue launching the container
+ // A failure scenario on bad input such as invalid shell script path
+ // We know we cannot continue launching the container
// so we should release it.
// TODO
numCompletedContainers.incrementAndGet();
@@ -676,12 +729,12 @@ public class ApplicationMaster {
}
ctx.setLocalResources(localResources);
- // Set the necessary command to execute on the allocated container
+ // Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
- // Set executable command
+ // Set executable command
vargs.add(shellCommand);
- // Set shell script path
+ // Set shell script path
if (!shellScriptPath.isEmpty()) {
vargs.add(ExecShellStringPath);
}
@@ -689,11 +742,6 @@ public class ApplicationMaster {
// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
- // TODO
- // We should redirect the output to hdfs instead of local logs
- // so as to be able to look at the final output after the containers
- // have been released.
- // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err]
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
@@ -707,131 +755,78 @@ public class ApplicationMaster {
commands.add(command.toString());
ctx.setCommands(commands);
- StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+ StartContainerRequest startReq = Records
+ .newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
try {
cm.startContainer(startReq);
} catch (YarnRemoteException e) {
- LOG.info("Start container failed for :"
- + ", containerId=" + container.getId());
+ LOG.info("Start container failed for :" + ", containerId="
+ + container.getId());
e.printStackTrace();
- // TODO do we need to release this container?
+ // TODO do we need to release this container?
}
// Get container status?
- // Left commented out as the shell scripts are short lived
- // and we are relying on the status for completed containers from RM to detect status
-
- // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
- // statusReq.setContainerId(container.getId());
- // GetContainerStatusResponse statusResp;
- //try {
- //statusResp = cm.getContainerStatus(statusReq);
- // LOG.info("Container Status"
- // + ", id=" + container.getId()
- // + ", status=" +statusResp.getStatus());
- //} catch (YarnRemoteException e) {
- //e.printStackTrace();
- //}
+ // Left commented out as the shell scripts are short lived
+ // and we are relying on the status for completed containers
+ // from RM to detect status
+
+ // GetContainerStatusRequest statusReq =
+ // Records.newRecord(GetContainerStatusRequest.class);
+ // statusReq.setContainerId(container.getId());
+ // GetContainerStatusResponse statusResp;
+ // try {
+ // statusResp = cm.getContainerStatus(statusReq);
+ // LOG.info("Container Status"
+ // + ", id=" + container.getId()
+ // + ", status=" +statusResp.getStatus());
+ // } catch (YarnRemoteException e) {
+ // e.printStackTrace();
+ // }
}
}
/**
- * Connect to the Resource Manager
- * @return Handle to communicate with the RM
- */
- private AMRMProtocol connectToRM() {
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
- InetSocketAddress rmAddress = yarnConf.getSocketAddr(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
- }
-
- /**
- * Register the Application Master to the Resource Manager
- * @return the registration response from the RM
- * @throws YarnRemoteException
- */
- private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
- RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
-
- // set the required info into the registration request:
- // application attempt id,
- // host on which the app master is running
- // rpc port on which the app master accepts requests from the client
- // tracking url for the app master
- appMasterRequest.setApplicationAttemptId(appAttemptID);
- appMasterRequest.setHost(appMasterHostname);
- appMasterRequest.setRpcPort(appMasterRpcPort);
- appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-
- return resourceManager.registerApplicationMaster(appMasterRequest);
- }
-
- /**
* Setup the request that will be sent to the RM for the container ask.
+ *
* @param numContainers Containers to ask for from RM
* @return the setup ResourceRequest to be sent to RM
*/
- private ResourceRequest setupContainerAskForRM(int numContainers) {
- ResourceRequest request = Records.newRecord(ResourceRequest.class);
-
- // setup requirements for hosts
- // whether a particular rack/host is needed
- // Refer to apis under org.apache.hadoop.net for more
- // details on how to get figure out rack/host mapping.
+ private ContainerRequest setupContainerAskForRM(int numContainers) {
+ // setup requirements for hosts
// using * as any host will do for the distributed shell app
- request.setHostName("*");
-
- // set no. of containers needed
- request.setNumContainers(numContainers);
-
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
- // TODO - what is the range for priority? how to decide?
+ // TODO - what is the range for priority? how to decide?
pri.setPriority(requestPriority);
- request.setPriority(pri);
// Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory);
- request.setCapability(capability);
+ ContainerRequest request = new ContainerRequest(capability, null, null,
+ pri, numContainers);
+ LOG.info("Requested container ask: " + request.toString());
return request;
}
/**
* Ask RM to allocate given no. of containers to this Application Master
+ *
* @param requestedContainers Containers to ask for from RM
- * @return Response from RM to AM with allocated containers
+ * @return Response from RM to AM with allocated containers
* @throws YarnRemoteException
*/
- private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
- throws YarnRemoteException {
- AllocateRequest req = Records.newRecord(AllocateRequest.class);
- req.setResponseId(rmRequestID.incrementAndGet());
- req.setApplicationAttemptId(appAttemptID);
- req.addAllAsks(requestedContainers);
- req.addAllReleases(releasedContainers);
- req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
-
- LOG.info("Sending request to RM for containers"
- + ", requestedSet=" + requestedContainers.size()
- + ", releasedSet=" + releasedContainers.size()
- + ", progress=" + req.getProgress());
+ private AMResponse sendContainerAskToRM() throws YarnRemoteException {
+ float progressIndicator = (float) numCompletedContainers.get()
+ / numTotalContainers;
- for (ResourceRequest rsrcReq : requestedContainers) {
- LOG.info("Requested container ask: " + rsrcReq.toString());
- }
- for (ContainerId id : releasedContainers) {
- LOG.info("Released container, id=" + id.getId());
- }
+ LOG.info("Sending request to RM for containers" + ", progress="
+ + progressIndicator);
- AllocateResponse resp = resourceManager.allocate(req);
+ AllocateResponse resp = resourceManager.allocate(progressIndicator);
return resp.getAMResponse();
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Thu Jan 24 22:44:10 2013
@@ -18,10 +18,7 @@
package org.apache.hadoop.yarn.applications.distributedshell;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -212,7 +209,7 @@ public class Client extends YarnClientIm
/**
*/
public Client() throws Exception {
- this(new Configuration());
+ this(new YarnConfiguration());
}
/**
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Thu Jan 24 22:44:10 2013
@@ -120,6 +120,7 @@ public class TestDistributedShell {
boolean exceptionThrown = false;
try {
boolean initSuccess = client.init(args);
+ Assert.assertTrue(initSuccess);
}
catch (IllegalArgumentException e) {
exceptionThrown = true;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml Thu Jan 24 22:44:10 2013
@@ -93,6 +93,7 @@
<executions>
<execution>
<id>version-info</id>
+ <phase>compile</phase>
<goals>
<goal>version-info</goal>
</goals>
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Jan 24 22:44:10 2013
@@ -236,6 +236,10 @@ public class YarnConfiguration extends C
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
+ /** URI for FileSystemRMStateStore */
+ public static final String FS_RM_STATE_STORE_URI =
+ RM_PREFIX + "fs.rm-state-store.uri";
+
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Thu Jan 24 22:44:10 2013
@@ -230,6 +230,17 @@
<property>
<description>The class to use as the persistent store.</description>
<name>yarn.resourcemanager.store.class</name>
+ <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
+ </property>
+
+ <property>
+ <description>URI pointing to the location of the FileSystem path where
+ RM state will be stored. This must be supplied when using
+ org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+ as the value for yarn.resourcemanager.store.class</description>
+ <name>yarn.resourcemanager.fs.rm-state-store.uri</name>
+ <value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
+ <!--value>hdfs://localhost:9000/rmstore</value-->
</property>
<property>
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Thu Jan 24 22:44:10 2013
@@ -41,6 +41,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Thu Jan 24 22:44:10 2013
@@ -18,10 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+@Unstable
public class NullRMStateStore extends RMStateStore {
@Override
@@ -36,7 +38,7 @@ public class NullRMStateStore extends RM
@Override
public RMState loadState() throws Exception {
- return null;
+ throw new UnsupportedOperationException("Cannot load state from null store");
}
@Override
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Jan 24 22:44:10 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -502,8 +503,11 @@ public class FairScheduler implements Re
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) {
- LOG.info("User " + userUgi.getUserName() +
- " cannot submit applications to queue " + queue.getName());
+ String msg = "User " + userUgi.getUserName() +
+ " cannot submit applications to queue " + queue.getName();
+ LOG.info(msg);
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
return;
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Jan 24 22:44:10 2013
@@ -41,20 +41,29 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -1357,4 +1366,54 @@ public class TestFairScheduler {
assertEquals(2, app1.getLiveContainers().size());
assertEquals(1, app2.getLiveContainers().size());
}
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNotAllowSubmitApplication() throws Exception {
+ // Set acl's
+ Configuration conf = createConfiguration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queue1\">");
+ out.println("<aclSubmitApps>userallow</aclSubmitApps>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ int appId = this.APP_ID++;
+ String user = "usernotallow";
+ String queue = "queue1";
+ ApplicationId applicationId = MockApps.newAppID(appId);
+ String name = MockApps.newAppName();
+ ApplicationMasterService masterService =
+ new ApplicationMasterService(resourceManager.getRMContext(), scheduler);
+ ApplicationSubmissionContext submissionContext = new ApplicationSubmissionContextPBImpl();
+ RMApp application =
+ new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user,
+ queue, submissionContext, scheduler, masterService,
+ System.currentTimeMillis());
+ resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
+ application.handle(new RMAppEvent(applicationId, RMAppEventType.START));
+
+ ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ attId.setAttemptId(this.ATTEMPT_ID++);
+ attId.setApplicationId(applicationId);
+ scheduler.addApplication(attId, queue, user);
+
+ final int MAX_TRIES=20;
+ int numTries = 0;
+ while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {ex.printStackTrace();}
+ numTries++;
+ }
+ assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java Thu Jan 24 22:44:10 2013
@@ -114,4 +114,13 @@ public class WebAppProxy extends Abstrac
}
super.stop();
}
+
+ public void join() {
+ if(proxyServer != null) {
+ try {
+ proxyServer.join();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServer.java?rev=1438243&r1=1438242&r2=1438243&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxyServer.java Thu Jan 24 22:44:10 2013
@@ -73,6 +73,14 @@ public class WebAppProxyServer extends C
YarnConfiguration.PROXY_PRINCIPAL);
}
+ /**
+ * Wait for service to finish.
+ * (Normally, it runs forever.)
+ */
+ private void join() {
+ proxy.join();
+ }
+
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(WebAppProxyServer.class, args, LOG);
@@ -84,6 +92,7 @@ public class WebAppProxyServer extends C
YarnConfiguration conf = new YarnConfiguration();
proxy.init(conf);
proxy.start();
+ proxy.join();
} catch (Throwable t) {
LOG.fatal("Error starting Proxy server", t);
System.exit(-1);