You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by to...@apache.org on 2013/04/01 18:47:34 UTC
svn commit: r1463203 [2/8] - in
/hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/bin/ hadoop-yarn/conf/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org...
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Mon Apr 1 16:47:16 2013
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -64,12 +63,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -148,8 +147,8 @@ public class ApplicationMaster {
private YarnRPC rpc;
// Handle to communicate with the Resource Manager
- private AMRMClient resourceManager;
-
+ private AMRMClientAsync resourceManager;
+
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
@@ -170,8 +169,6 @@ public class ApplicationMaster {
// Priority of the request
private int requestPriority;
- // Simple flag to denote whether all works is done
- private boolean appDone = false;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
@@ -202,6 +199,9 @@ public class ApplicationMaster {
// Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh";
+ private volatile boolean done;
+ private volatile boolean success;
+
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
@@ -319,10 +319,7 @@ public class ApplicationMaster {
Map<String, String> envs = System.getenv();
- if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
- appAttemptID = ConverterUtils.toApplicationAttemptId(envs
- .get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
- } else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+ if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
@@ -336,6 +333,23 @@ public class ApplicationMaster {
appAttemptID = containerId.getApplicationAttemptId();
}
+ if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
+ throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+ + " not set in the environment");
+ }
+ if (!envs.containsKey(ApplicationConstants.NM_HOST_ENV)) {
+ throw new RuntimeException(ApplicationConstants.NM_HOST_ENV
+ + " not set in the environment");
+ }
+ if (!envs.containsKey(ApplicationConstants.NM_HTTP_PORT_ENV)) {
+ throw new RuntimeException(ApplicationConstants.NM_HTTP_PORT_ENV
+ + " not set in the environment");
+ }
+ if (!envs.containsKey(ApplicationConstants.NM_PORT_ENV)) {
+ throw new RuntimeException(ApplicationConstants.NM_PORT_ENV
+ + " not set in the environment");
+ }
+
LOG.info("Application master for app" + ", appId="
+ appAttemptID.getApplicationId().getId() + ", clustertimestamp="
+ appAttemptID.getApplicationId().getClusterTimestamp()
@@ -394,6 +408,10 @@ public class ApplicationMaster {
"container_memory", "10"));
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
+ if (numTotalContainers == 0) {
+ throw new IllegalArgumentException(
+ "Cannot run distributed shell with no containers");
+ }
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
@@ -417,225 +435,202 @@ public class ApplicationMaster {
public boolean run() throws YarnRemoteException {
LOG.info("Starting ApplicationMaster");
- // Connect to ResourceManager
- resourceManager = new AMRMClientImpl(appAttemptID);
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+
+ resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
- try {
- // Setup local RPC Server to accept status requests directly from clients
- // TODO need to setup a protocol for client to be able to communicate to
- // the RPC server
- // TODO use the rpc port info to register with the RM for the client to
- // send requests to this app master
-
- // Register self with ResourceManager
- RegisterApplicationMasterResponse response = resourceManager
- .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- appMasterTrackingUrl);
- // Dump out information about cluster capability as seen by the
- // resource manager
- int minMem = response.getMinimumResourceCapability().getMemory();
- int maxMem = response.getMaximumResourceCapability().getMemory();
- LOG.info("Min mem capabililty of resources in this cluster " + minMem);
- LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
- // A resource ask has to be atleast the minimum of the capability of the
- // cluster, the value has to be a multiple of the min value and cannot
- // exceed the max.
- // If it is not an exact multiple of min, the RM will allocate to the
- // nearest multiple of min
- if (containerMemory < minMem) {
- LOG.info("Container memory specified below min threshold of cluster."
- + " Using min value." + ", specified=" + containerMemory + ", min="
- + minMem);
- containerMemory = minMem;
- } else if (containerMemory > maxMem) {
- LOG.info("Container memory specified above max threshold of cluster."
- + " Using max value." + ", specified=" + containerMemory + ", max="
- + maxMem);
- containerMemory = maxMem;
- }
-
- // Setup heartbeat emitter
- // TODO poll RM every now and then with an empty request to let RM know
- // that we are alive
- // The heartbeat interval after which an AM is timed out by the RM is
- // defined by a config setting:
- // RM_AM_EXPIRY_INTERVAL_MS with default defined by
- // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
- // The allocate calls to the RM count as heartbeats so, for now,
- // this additional heartbeat emitter is not required.
-
- // Setup ask for containers from RM
- // Send request for containers to RM
- // Until we get our fully allocated quota, we keep on polling RM for
- // containers
- // Keep looping until all the containers are launched and shell script
- // executed on them ( regardless of success/failure).
-
- int loopCounter = -1;
-
- while (numCompletedContainers.get() < numTotalContainers && !appDone) {
- loopCounter++;
-
- // log current state
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers + ", completed="
- + numCompletedContainers + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // Sleep before each loop when asking RM for containers
- // to avoid flooding RM with spurious requests when it
- // need not have any available containers
- // Sleeping for 1000 ms.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted " + e.getMessage());
- }
+ // Setup local RPC Server to accept status requests directly from clients
+ // TODO need to setup a protocol for client to be able to communicate to
+ // the RPC server
+ // TODO use the rpc port info to register with the RM for the client to
+ // send requests to this app master
+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ RegisterApplicationMasterResponse response = resourceManager
+ .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ appMasterTrackingUrl);
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int minMem = response.getMinimumResourceCapability().getMemory();
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // A resource ask has to be atleast the minimum of the capability of the
+ // cluster, the value has to be a multiple of the min value and cannot
+ // exceed the max.
+ // If it is not an exact multiple of min, the RM will allocate to the
+ // nearest multiple of min
+ if (containerMemory < minMem) {
+ LOG.info("Container memory specified below min threshold of cluster."
+ + " Using min value." + ", specified=" + containerMemory + ", min="
+ + minMem);
+ containerMemory = minMem;
+ } else if (containerMemory > maxMem) {
+ LOG.info("Container memory specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerMemory + ", max="
+ + maxMem);
+ containerMemory = maxMem;
+ }
- // No. of containers to request
- // For the first loop, askCount will be equal to total containers needed
- // From that point on, askCount will always be 0 as current
- // implementation does not change its ask on container failures.
- int askCount = numTotalContainers - numRequestedContainers.get();
- numRequestedContainers.addAndGet(askCount);
-
- if (askCount > 0) {
- ContainerRequest containerAsk = setupContainerAskForRM(askCount);
- resourceManager.addContainerRequest(containerAsk);
- }
- // Send the request to RM
- LOG.info("Asking RM for containers" + ", askCount=" + askCount);
- AMResponse amResp = sendContainerAskToRM();
-
- // Retrieve list of allocated containers from the response
- List<Container> allocatedContainers = amResp.getAllocatedContainers();
- LOG.info("Got response from RM for container ask, allocatedCnt="
- + allocatedContainers.size());
- numAllocatedContainers.addAndGet(allocatedContainers.size());
- for (Container allocatedContainer : allocatedContainers) {
- LOG.info("Launching shell command on a new container."
- + ", containerId=" + allocatedContainer.getId()
- + ", containerNode=" + allocatedContainer.getNodeId().getHost()
- + ":" + allocatedContainer.getNodeId().getPort()
- + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
- + ", containerState" + allocatedContainer.getState()
- + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemory());
- // + ", containerToken"
- // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
- LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
- allocatedContainer);
- Thread launchThread = new Thread(runnableLaunchContainer);
-
- // launch and start the container on a separate thread to keep
- // the main thread unblocked
- // as all containers may not be allocated at one go.
- launchThreads.add(launchThread);
- launchThread.start();
- }
+ // Setup ask for containers from RM
+ // Send request for containers to RM
+ // Until we get our fully allocated quota, we keep on polling RM for
+ // containers
+ // Keep looping until all the containers are launched and shell script
+ // executed on them ( regardless of success/failure).
+ ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
+ resourceManager.addContainerRequest(containerAsk);
+ numRequestedContainers.set(numTotalContainers);
- // Check what the current available resources in the cluster are
- // TODO should we do anything if the available resources are not enough?
- Resource availableResources = amResp.getAvailableResources();
- LOG.info("Current available resources in the cluster "
- + availableResources);
-
- // Check the completed containers
- List<ContainerStatus> completedContainers = amResp
- .getCompletedContainersStatuses();
- LOG.info("Got response from RM for container ask, completedCnt="
- + completedContainers.size());
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID="
- + containerStatus.getContainerId() + ", state="
- + containerStatus.getState() + ", exitStatus="
- + containerStatus.getExitStatus() + ", diagnostics="
- + containerStatus.getDiagnostics());
-
- // non complete containers should not be here
- assert (containerStatus.getState() == ContainerState.COMPLETE);
-
- // increment counters for completed/failed containers
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- // container failed
- if (-100 != exitStatus) {
- // shell script failed
- // counts as completed
- numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- } else {
- // something else bad happened
- // app job did not complete for some reason
- // we should re-try as the container was lost for some reason
- numAllocatedContainers.decrementAndGet();
- numRequestedContainers.decrementAndGet();
- // we do not need to release the container as it would be done
- // by the RM/CM.
- }
- } else {
- // nothing to do
- // container completed successfully
+ while (!done) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {}
+ }
+ finish();
+
+ return success;
+ }
+
+ private void finish() {
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ success = true;
+ if (numFailedContainers.get() == 0) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ + ", completed=" + numCompletedContainers.get() + ", allocated="
+ + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ success = false;
+ }
+ try {
+ resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnRemoteException ex) {
+ LOG.error("Failed to unregister application", ex);
+ }
+
+ done = true;
+ resourceManager.stop();
+ }
+
+ private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt="
+ + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus="
+ + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
+ // shell script failed
+ // counts as completed
numCompletedContainers.incrementAndGet();
- LOG.info("Container completed successfully." + ", containerId="
- + containerStatus.getContainerId());
+ numFailedContainers.incrementAndGet();
+ } else {
+ // container was killed by framework, possibly preempted
+ // we should re-try as the container was lost for some reason
+ numAllocatedContainers.decrementAndGet();
+ numRequestedContainers.decrementAndGet();
+ // we do not need to release the container as it would be done
+ // by the RM
}
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
}
- if (numCompletedContainers.get() == numTotalContainers) {
- appDone = true;
- }
-
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers + ", completed="
- + numCompletedContainers + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // TODO
- // Add a timeout handling layer
- // for misbehaving shell commands
}
-
- // Join all launched threads
- // needed for when we time out
- // and we need to release containers
- for (Thread launchThread : launchThreads) {
- try {
- launchThread.join(10000);
- } catch (InterruptedException e) {
- LOG.info("Exception thrown in thread join: " + e.getMessage());
- e.printStackTrace();
- }
+
+ // ask for more containers if any failed
+ int askCount = numTotalContainers - numRequestedContainers.get();
+ numRequestedContainers.addAndGet(askCount);
+
+ if (askCount > 0) {
+ ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+ resourceManager.addContainerRequest(containerAsk);
+ }
+
+ // set progress to deliver to RM on next heartbeat
+ float progress = (float) numCompletedContainers.get()
+ / numTotalContainers;
+ resourceManager.setProgress(progress);
+
+ if (numCompletedContainers.get() == numTotalContainers) {
+ done = true;
}
+ }
- // When the application completes, it should send a finish application
- // signal to the RM
- LOG.info("Application completed. Signalling finish to RM");
-
- FinalApplicationStatus appStatus;
- String appMessage = null;
- boolean isSuccess = true;
- if (numFailedContainers.get() == 0) {
- appStatus = FinalApplicationStatus.SUCCEEDED;
- } else {
- appStatus = FinalApplicationStatus.FAILED;
- appMessage = "Diagnostics." + ", total=" + numTotalContainers
- + ", completed=" + numCompletedContainers.get() + ", allocated="
- + numAllocatedContainers.get() + ", failed="
- + numFailedContainers.get();
- isSuccess = false;
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching shell command on a new container."
+ + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ + ":" + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ + ", containerState" + allocatedContainer.getState()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+ // + ", containerToken"
+ // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+ allocatedContainer);
+ Thread launchThread = new Thread(runnableLaunchContainer);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
}
- resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
- return isSuccess;
- } finally {
- resourceManager.stop();
}
+
+ @Override
+ public void onRebootRequest() {}
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
}
/**
@@ -811,22 +806,4 @@ public class ApplicationMaster {
LOG.info("Requested container ask: " + request.toString());
return request;
}
-
- /**
- * Ask RM to allocate given no. of containers to this Application Master
- *
- * @param requestedContainers Containers to ask for from RM
- * @return Response from RM to AM with allocated containers
- * @throws YarnRemoteException
- */
- private AMResponse sendContainerAskToRM() throws YarnRemoteException {
- float progressIndicator = (float) numCompletedContainers.get()
- / numTotalContainers;
-
- LOG.info("Sending request to RM for containers" + ", progress="
- + progressIndicator);
-
- AllocateResponse resp = resourceManager.allocate(progressIndicator);
- return resp.getAMResponse();
- }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Mon Apr 1 16:47:16 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.applications.distributedshell;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -39,6 +40,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -481,14 +483,15 @@ public class Client extends YarnClientIm
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
- StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+ StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
+ .append(File.pathSeparatorChar).append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
- classPathEnv.append(':');
+ classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(c.trim());
}
- classPathEnv.append(":./log4j.properties");
+ classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
// add the runtime classpath needed for tests to work
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
@@ -505,7 +508,7 @@ public class Client extends YarnClientIm
// Set java executable command
LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Mon Apr 1 16:47:16 2013
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -54,8 +55,8 @@ public class TestDistributedShell {
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
if (yarnCluster == null) {
- yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName(),
- 1, 1, 1);
+ yarnCluster = new MiniYARNCluster(
+ TestDistributedShell.class.getSimpleName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
@@ -82,7 +83,7 @@ public class TestDistributedShell {
}
}
- @Test
+ @Test(timeout=30000)
public void testDSShell() throws Exception {
String[] args = {
@@ -91,7 +92,7 @@ public class TestDistributedShell {
"--num_containers",
"2",
"--shell_command",
- "ls",
+ Shell.WINDOWS ? "dir" : "ls",
"--master_memory",
"512",
"--container_memory",
@@ -110,7 +111,7 @@ public class TestDistributedShell {
}
- @Test
+ @Test(timeout=30000)
public void testDSShellWithNoArgs() throws Exception {
String[] args = {};
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml Mon Apr 1 16:47:16 2013
@@ -87,6 +87,16 @@
<build>
<plugins>
<plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.hadoop.yarn.applications.unmanagedamlauncher.UnmanagedAMLauncher</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java Mon Apr 1 16:47:16 2013
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -81,6 +83,8 @@ public class UnmanagedAMLauncher {
// set the classpath explicitly
private String classpath = null;
+ private volatile boolean amCompleted = false;
+
/**
* @param args
* Command line arguments
@@ -179,8 +183,18 @@ public class UnmanagedAMLauncher {
if(!setClasspath && classpath!=null) {
envAMList.add("CLASSPATH="+classpath);
}
-
- envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
+
+ ContainerId containerId = Records.newRecord(ContainerId.class);
+ containerId.setApplicationAttemptId(attemptId);
+ containerId.setId(0);
+
+ String hostname = InetAddress.getLocalHost().getHostName();
+ envAMList.add(ApplicationConstants.AM_CONTAINER_ID_ENV + "=" + containerId);
+ envAMList.add(ApplicationConstants.NM_HOST_ENV + "=" + hostname);
+ envAMList.add(ApplicationConstants.NM_HTTP_PORT_ENV + "=0");
+ envAMList.add(ApplicationConstants.NM_PORT_ENV + "=0");
+ envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
+ + System.currentTimeMillis());
String[] envAM = new String[envAMList.size()];
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
@@ -233,8 +247,10 @@ public class UnmanagedAMLauncher {
LOG.info("AM process exited with value: " + exitCode);
} catch (InterruptedException e) {
e.printStackTrace();
+ } finally {
+ amCompleted = true;
}
-
+
try {
// make sure that the error thread exits
// on Windows these threads sometimes get stuck and hang the execution
@@ -306,6 +322,7 @@ public class UnmanagedAMLauncher {
appReport = monitorApplication(appId, EnumSet.of(
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
YarnApplicationState.FINISHED));
+
YarnApplicationState appState = appReport.getYarnApplicationState();
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
@@ -341,6 +358,19 @@ public class UnmanagedAMLauncher {
private ApplicationReport monitorApplication(ApplicationId appId,
Set<YarnApplicationState> finalState) throws YarnRemoteException {
+ long foundAMCompletedTime = 0;
+ final int timeToWaitMS = 10000;
+ StringBuilder expectedFinalState = new StringBuilder();
+ boolean first = true;
+ for (YarnApplicationState state : finalState) {
+ if (first) {
+ first = false;
+ expectedFinalState.append(state.name());
+ } else {
+ expectedFinalState.append("," + state.name());
+ }
+ }
+
while (true) {
// Check app status every 1 second.
@@ -370,8 +400,24 @@ public class UnmanagedAMLauncher {
return report;
}
+ // wait for 10 seconds after process has completed for app report to
+ // come back
+ if (amCompleted) {
+ if (foundAMCompletedTime == 0) {
+ foundAMCompletedTime = System.currentTimeMillis();
+ } else if ((System.currentTimeMillis() - foundAMCompletedTime)
+ > timeToWaitMS) {
+ LOG.warn("Waited " + timeToWaitMS/1000
+ + " seconds after process completed for AppReport"
+ + " to reach desired final state. Not waiting anymore."
+ + "CurrentState = " + state
+ + ", ExpectedStates = " + expectedFinalState.toString());
+ throw new RuntimeException("Failed to receive final expected state"
+ + " in ApplicationReport"
+ + ", CurrentState=" + state
+ + ", ExpectedStates=" + expectedFinalState.toString());
+ }
+ }
}
-
}
-
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Mon Apr 1 16:47:16 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
+import static org.junit.Assert.fail;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -91,7 +93,7 @@ public class TestUnmanagedAMLauncher {
return envClassPath;
}
- @Test
+ @Test(timeout=10000)
public void testDSShell() throws Exception {
String classpath = getTestRuntimeClasspath();
String javaHome = System.getenv("JAVA_HOME");
@@ -99,7 +101,7 @@ public class TestUnmanagedAMLauncher {
LOG.fatal("JAVA_HOME not defined. Test not running.");
return;
}
- // start dist-shell with 0 containers because container launch will fail if
+ // start dist-shell with 0 containers because container launch will fail if
// there are no dist cache resources.
String[] args = {
"--classpath",
@@ -110,7 +112,7 @@ public class TestUnmanagedAMLauncher {
javaHome
+ "/bin/java -Xmx512m "
+ "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
- + "--container_memory 128 --num_containers 0 --priority 0 --shell_command ls" };
+ + "--container_memory 128 --num_containers 1 --priority 0 --shell_command ls" };
LOG.info("Initializing Launcher");
UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
@@ -125,4 +127,40 @@ public class TestUnmanagedAMLauncher {
}
+ @Test(timeout=30000)
+ public void testDSShellError() throws Exception {
+ String classpath = getTestRuntimeClasspath();
+ String javaHome = System.getenv("JAVA_HOME");
+ if (javaHome == null) {
+ LOG.fatal("JAVA_HOME not defined. Test not running.");
+ return;
+ }
+
+ // remove shell command to make dist-shell fail in initialization itself
+ String[] args = {
+ "--classpath",
+ classpath,
+ "--queue",
+ "default",
+ "--cmd",
+ javaHome
+ + "/bin/java -Xmx512m "
+ + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
+ + "--container_memory 128 --num_containers 1 --priority 0" };
+
+ LOG.info("Initializing Launcher");
+ UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
+ yarnCluster.getConfig()));
+ boolean initSuccess = launcher.init(args);
+ Assert.assertTrue(initSuccess);
+ LOG.info("Running Launcher");
+
+ try {
+ launcher.run();
+ fail("Expected an exception to occur as launch should have failed");
+ } catch (RuntimeException e) {
+ // Expected
+ }
+ }
+
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml Mon Apr 1 16:47:16 2013
@@ -32,6 +32,28 @@
<module>hadoop-yarn-applications-distributedshell</module>
<module>hadoop-yarn-applications-unmanaged-am-launcher</module>
</modules>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <!-- HADOOP_HOME required for tests on Windows to find winutils -->
+ <HADOOP_HOME>${basedir}/../../../../hadoop-common-project/hadoop-common/target</HADOOP_HOME>
+ </environmentVariables>
+ <properties>
+ <property>
+ <name>listener</name>
+ <value>org.apache.hadoop.test.TimedOutTestsListener</value>
+ </property>
+ </properties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
<profiles>
<profile>
<id>clover</id>
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Mon Apr 1 16:47:16 2013
@@ -35,11 +35,6 @@ import org.apache.hadoop.yarn.service.Se
public interface AMRMClient extends Service {
/**
- * Value used to define no locality
- */
- static final String ANY = "*";
-
- /**
* Object to represent container request for resources.
* Resources may be localized to nodes and racks.
* Resources may be assigned priorities.
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Mon Apr 1 16:47:16 2013
@@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -194,13 +193,12 @@ public class AMRMClientImpl extends Abst
}
allocateResponse = rmClient.allocate(allocateRequest);
- AMResponse response = allocateResponse.getAMResponse();
synchronized (this) {
// update these on successful RPC
clusterNodeCount = allocateResponse.getNumClusterNodes();
- lastResponseId = response.getResponseId();
- clusterAvailableResources = response.getAvailableResources();
+ lastResponseId = allocateResponse.getResponseId();
+ clusterAvailableResources = allocateResponse.getAvailableResources();
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
@@ -260,7 +258,8 @@ public class AMRMClientImpl extends Abst
}
// Off-switch
- addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+ addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
+ req.containerCount);
}
@Override
@@ -278,7 +277,8 @@ public class AMRMClientImpl extends Abst
}
}
- decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+ decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
+ req.containerCount);
}
@Override
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java Mon Apr 1 16:47:16 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.client.cli;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
@@ -31,7 +33,9 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.util.ConverterUtils;
public class ApplicationCLI extends YarnCLI {
- private static final String APPLICATIONS_PATTERN = "%30s\t%20s\t%10s\t%10s\t%18s\t%18s\t%35s\n";
+ private static final String APPLICATIONS_PATTERN =
+ "%30s\t%20s\t%10s\t%10s\t%18s\t%18s\t%35s" +
+ System.getProperty("line.separator");
public static void main(String[] args) throws Exception {
ApplicationCLI cli = new ApplicationCLI();
@@ -123,37 +127,44 @@ public class ApplicationCLI extends Yarn
* @throws YarnRemoteException
*/
private void printApplicationReport(String applicationId)
- throws YarnRemoteException {
+ throws YarnRemoteException, IOException {
ApplicationReport appReport = client.getApplicationReport(ConverterUtils
.toApplicationId(applicationId));
- StringBuffer appReportStr = new StringBuffer();
+ // Use PrintWriter.println, which uses correct platform line ending.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter appReportStr = new PrintWriter(baos);
if (appReport != null) {
- appReportStr.append("Application Report : ");
- appReportStr.append("\n\tApplication-Id : ");
- appReportStr.append(appReport.getApplicationId());
- appReportStr.append("\n\tApplication-Name : ");
- appReportStr.append(appReport.getName());
- appReportStr.append("\n\tUser : ");
- appReportStr.append(appReport.getUser());
- appReportStr.append("\n\tQueue : ");
- appReportStr.append(appReport.getQueue());
- appReportStr.append("\n\tStart-Time : ");
- appReportStr.append(appReport.getStartTime());
- appReportStr.append("\n\tFinish-Time : ");
- appReportStr.append(appReport.getFinishTime());
- appReportStr.append("\n\tState : ");
- appReportStr.append(appReport.getYarnApplicationState());
- appReportStr.append("\n\tFinal-State : ");
- appReportStr.append(appReport.getFinalApplicationStatus());
- appReportStr.append("\n\tTracking-URL : ");
- appReportStr.append(appReport.getOriginalTrackingUrl());
- appReportStr.append("\n\tDiagnostics : ");
- appReportStr.append(appReport.getDiagnostics());
+ appReportStr.println("Application Report : ");
+ appReportStr.print("\tApplication-Id : ");
+ appReportStr.println(appReport.getApplicationId());
+ appReportStr.print("\tApplication-Name : ");
+ appReportStr.println(appReport.getName());
+ appReportStr.print("\tUser : ");
+ appReportStr.println(appReport.getUser());
+ appReportStr.print("\tQueue : ");
+ appReportStr.println(appReport.getQueue());
+ appReportStr.print("\tStart-Time : ");
+ appReportStr.println(appReport.getStartTime());
+ appReportStr.print("\tFinish-Time : ");
+ appReportStr.println(appReport.getFinishTime());
+ appReportStr.print("\tState : ");
+ appReportStr.println(appReport.getYarnApplicationState());
+ appReportStr.print("\tFinal-State : ");
+ appReportStr.println(appReport.getFinalApplicationStatus());
+ appReportStr.print("\tTracking-URL : ");
+ appReportStr.println(appReport.getOriginalTrackingUrl());
+ appReportStr.print("\tRPC Port : ");
+ appReportStr.println(appReport.getRpcPort());
+ appReportStr.print("\tAM Host : ");
+ appReportStr.println(appReport.getHost());
+ appReportStr.print("\tDiagnostics : ");
+ appReportStr.print(appReport.getDiagnostics());
} else {
- appReportStr.append("Application with id '" + applicationId
+ appReportStr.print("Application with id '" + applicationId
+ "' doesn't exist in RM.");
}
- sysout.println(appReportStr.toString());
+ appReportStr.close();
+ sysout.println(baos.toString("UTF-8"));
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java Mon Apr 1 16:47:16 2013
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.yarn.client.cli;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.PrintWriter;
+import java.util.Date;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -31,7 +35,9 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.util.ConverterUtils;
public class NodeCLI extends YarnCLI {
- private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%26s\t%18s\n";
+ private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%26s\t%18s" +
+ System.getProperty("line.separator");
+
public static void main(String[] args) throws Exception {
NodeCLI cli = new NodeCLI();
cli.setSysOutPrintStream(System.out);
@@ -100,48 +106,52 @@ public class NodeCLI extends YarnCLI {
* @param nodeIdStr
* @throws YarnRemoteException
*/
- private void printNodeStatus(String nodeIdStr) throws YarnRemoteException {
+ private void printNodeStatus(String nodeIdStr) throws YarnRemoteException,
+ IOException {
NodeId nodeId = ConverterUtils.toNodeId(nodeIdStr);
List<NodeReport> nodesReport = client.getNodeReports();
- StringBuffer nodeReportStr = new StringBuffer();
+ // Use PrintWriter.println, which uses correct platform line ending.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter nodeReportStr = new PrintWriter(baos);
NodeReport nodeReport = null;
for (NodeReport report : nodesReport) {
if (!report.getNodeId().equals(nodeId)) {
continue;
}
nodeReport = report;
- nodeReportStr.append("Node Report : ");
- nodeReportStr.append("\n\tNode-Id : ");
- nodeReportStr.append(nodeReport.getNodeId());
- nodeReportStr.append("\n\tRack : ");
- nodeReportStr.append(nodeReport.getRackName());
- nodeReportStr.append("\n\tNode-State : ");
- nodeReportStr.append(nodeReport.getNodeState());
- nodeReportStr.append("\n\tNode-Http-Address : ");
- nodeReportStr.append(nodeReport.getHttpAddress());
- nodeReportStr.append("\n\tHealth-Status(isNodeHealthy) : ");
- nodeReportStr.append(nodeReport.getNodeHealthStatus()
+ nodeReportStr.println("Node Report : ");
+ nodeReportStr.print("\tNode-Id : ");
+ nodeReportStr.println(nodeReport.getNodeId());
+ nodeReportStr.print("\tRack : ");
+ nodeReportStr.println(nodeReport.getRackName());
+ nodeReportStr.print("\tNode-State : ");
+ nodeReportStr.println(nodeReport.getNodeState());
+ nodeReportStr.print("\tNode-Http-Address : ");
+ nodeReportStr.println(nodeReport.getHttpAddress());
+ nodeReportStr.print("\tHealth-Status(isNodeHealthy) : ");
+ nodeReportStr.println(nodeReport.getNodeHealthStatus()
.getIsNodeHealthy());
- nodeReportStr.append("\n\tLast-Last-Health-Update : ");
- nodeReportStr.append(nodeReport.getNodeHealthStatus()
- .getLastHealthReportTime());
- nodeReportStr.append("\n\tHealth-Report : ");
+ nodeReportStr.print("\tLast-Health-Update : ");
+ nodeReportStr.println(DateFormatUtils.format(
+ new Date(nodeReport.getNodeHealthStatus().
+ getLastHealthReportTime()),"E dd/MMM/yy hh:mm:ss:SSzz"));
+ nodeReportStr.print("\tHealth-Report : ");
nodeReportStr
- .append(nodeReport.getNodeHealthStatus().getHealthReport());
- nodeReportStr.append("\n\tContainers : ");
- nodeReportStr.append(nodeReport.getNumContainers());
- nodeReportStr.append("\n\tMemory-Used : ");
- nodeReportStr.append((nodeReport.getUsed() == null) ? "0M"
+ .println(nodeReport.getNodeHealthStatus().getHealthReport());
+ nodeReportStr.print("\tContainers : ");
+ nodeReportStr.println(nodeReport.getNumContainers());
+ nodeReportStr.print("\tMemory-Used : ");
+ nodeReportStr.println((nodeReport.getUsed() == null) ? "0M"
: (nodeReport.getUsed().getMemory() + "M"));
- nodeReportStr.append("\n\tMemory-Capacity : ");
- nodeReportStr.append(nodeReport.getCapability().getMemory());
+ nodeReportStr.print("\tMemory-Capacity : ");
+ nodeReportStr.println(nodeReport.getCapability().getMemory());
}
if (nodeReport == null) {
- nodeReportStr.append("Could not find the node report for node id : "
+ nodeReportStr.print("Could not find the node report for node id : "
+ nodeIdStr);
}
-
- sysout.println(nodeReportStr.toString());
+ nodeReportStr.close();
+ sysout.println(baos.toString("UTF-8"));
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Mon Apr 1 16:47:16 2013
@@ -18,25 +18,21 @@
package org.apache.hadoop.yarn.client;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -58,6 +54,11 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestAMRMClient {
Configuration conf = null;
@@ -183,7 +184,7 @@ public class TestAMRMClient {
int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
.get(rack).get(capability).getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
- .get(AMRMClient.ANY).get(capability).getNumContainers();
+ .get(ResourceRequest.ANY).get(capability).getNumContainers();
assertTrue(containersRequestedNode == 2);
assertTrue(containersRequestedRack == 2);
@@ -202,9 +203,8 @@ public class TestAMRMClient {
assertTrue(amClient.release.size() == 0);
assertTrue(nodeCount == amClient.getClusterNodeCount());
- AMResponse amResponse = allocResponse.getAMResponse();
- allocatedContainerCount += amResponse.getAllocatedContainers().size();
- for(Container container : amResponse.getAllocatedContainers()) {
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+ for(Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId);
@@ -264,11 +264,11 @@ public class TestAMRMClient {
while(!releases.isEmpty() || iterationsLeft-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
- AMResponse amResponse = allocResponse.getAMResponse();
// RM did not send new containers because AM does not need any
- assertTrue(amResponse.getAllocatedContainers().size() == 0);
- if(amResponse.getCompletedContainersStatuses().size() > 0) {
- for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) {
+ assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+ if(allocResponse.getCompletedContainersStatuses().size() > 0) {
+ for(ContainerStatus cStatus :allocResponse
+ .getCompletedContainersStatuses()) {
if(releases.contains(cStatus.getContainerId())) {
assertTrue(cStatus.getState() == ContainerState.COMPLETE);
assertTrue(cStatus.getExitStatus() == -100);
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java Mon Apr 1 16:47:16 2013
@@ -29,11 +29,14 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import junit.framework.Assert;
+import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -79,12 +82,23 @@ public class TestYarnCLI {
int result = cli.run(new String[] { "-status", applicationId.toString() });
assertEquals(0, result);
verify(client).getApplicationReport(applicationId);
- String appReportStr = "Application Report : \n\t"
- + "Application-Id : application_1234_0005\n\t"
- + "Application-Name : appname\n\tUser : user\n\t"
- + "Queue : queue\n\tStart-Time : 0\n\tFinish-Time : 0\n\t"
- + "State : FINISHED\n\tFinal-State : SUCCEEDED\n\t"
- + "Tracking-URL : N/A\n\tDiagnostics : diagnostics\n";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ pw.println("Application Report : ");
+ pw.println("\tApplication-Id : application_1234_0005");
+ pw.println("\tApplication-Name : appname");
+ pw.println("\tUser : user");
+ pw.println("\tQueue : queue");
+ pw.println("\tStart-Time : 0");
+ pw.println("\tFinish-Time : 0");
+ pw.println("\tState : FINISHED");
+ pw.println("\tFinal-State : SUCCEEDED");
+ pw.println("\tTracking-URL : N/A");
+ pw.println("\tRPC Port : 124");
+ pw.println("\tAM Host : host");
+ pw.println("\tDiagnostics : diagnostics");
+ pw.close();
+ String appReportStr = baos.toString("UTF-8");
Assert.assertEquals(appReportStr, sysOutStream.toString());
verify(sysOut, times(1)).println(isA(String.class));
}
@@ -105,16 +119,18 @@ public class TestYarnCLI {
assertEquals(0, result);
verify(client).getApplicationList();
- StringBuffer appsReportStrBuf = new StringBuffer();
- appsReportStrBuf.append("Total Applications:1\n");
- appsReportStrBuf
- .append(" Application-Id\t Application-Name"
- + "\t User\t Queue\t State\t "
- + "Final-State\t Tracking-URL\n");
- appsReportStrBuf.append(" application_1234_0005\t "
- + "appname\t user\t queue\t FINISHED\t "
- + "SUCCEEDED\t N/A\n");
- Assert.assertEquals(appsReportStrBuf.toString(), sysOutStream.toString());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ pw.println("Total Applications:1");
+ pw.print(" Application-Id\t Application-Name");
+ pw.print("\t User\t Queue\t State\t ");
+ pw.println("Final-State\t Tracking-URL");
+ pw.print(" application_1234_0005\t ");
+ pw.print("appname\t user\t queue\t FINISHED\t ");
+ pw.println("SUCCEEDED\t N/A");
+ pw.close();
+ String appsReportStr = baos.toString("UTF-8");
+ Assert.assertEquals(appsReportStr, sysOutStream.toString());
verify(sysOut, times(1)).write(any(byte[].class), anyInt(), anyInt());
}
@@ -137,18 +153,20 @@ public class TestYarnCLI {
int result = cli.run(new String[] { "-list" });
assertEquals(0, result);
verify(client).getNodeReports();
- StringBuffer nodesReportStr = new StringBuffer();
- nodesReportStr.append("Total Nodes:3");
- nodesReportStr
- .append("\n Node-Id\tNode-State\tNode-Http-Address\t"
- + "Health-Status(isNodeHealthy)\tRunning-Containers");
- nodesReportStr.append("\n host0:0\t RUNNING\t host1:8888"
- + "\t false\t 0");
- nodesReportStr.append("\n host1:0\t RUNNING\t host1:8888"
- + "\t false\t 0");
- nodesReportStr.append("\n host2:0\t RUNNING\t host1:8888"
- + "\t false\t 0\n");
- Assert.assertEquals(nodesReportStr.toString(), sysOutStream.toString());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ pw.println("Total Nodes:3");
+ pw.print(" Node-Id\tNode-State\tNode-Http-Address\t");
+ pw.println("Health-Status(isNodeHealthy)\tRunning-Containers");
+ pw.print(" host0:0\t RUNNING\t host1:8888");
+ pw.println("\t false\t 0");
+ pw.print(" host1:0\t RUNNING\t host1:8888");
+ pw.println("\t false\t 0");
+ pw.print(" host2:0\t RUNNING\t host1:8888");
+ pw.println("\t false\t 0");
+ pw.close();
+ String nodesReportStr = baos.toString("UTF-8");
+ Assert.assertEquals(nodesReportStr, sysOutStream.toString());
verify(sysOut, times(1)).write(any(byte[].class), anyInt(), anyInt());
}
@@ -163,11 +181,22 @@ public class TestYarnCLI {
int result = cli.run(new String[] { "-status", nodeId.toString() });
assertEquals(0, result);
verify(client).getNodeReports();
- String nodeStatusStr = "Node Report : \n\tNode-Id : host0:0\n\t"
- + "Rack : rack1\n\tNode-State : RUNNING\n\t"
- + "Node-Http-Address : host1:8888\n\tHealth-Status(isNodeHealthy) "
- + ": false\n\tLast-Last-Health-Update : 0\n\tHealth-Report : null"
- + "\n\tContainers : 0\n\tMemory-Used : 0M\n\tMemory-Capacity : 0";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ pw.println("Node Report : ");
+ pw.println("\tNode-Id : host0:0");
+ pw.println("\tRack : rack1");
+ pw.println("\tNode-State : RUNNING");
+ pw.println("\tNode-Http-Address : host1:8888");
+ pw.println("\tHealth-Status(isNodeHealthy) : false");
+ pw.println("\tLast-Health-Update : "
+ + DateFormatUtils.format(new Date(0), "E dd/MMM/yy hh:mm:ss:SSzz"));
+ pw.println("\tHealth-Report : null");
+ pw.println("\tContainers : 0");
+ pw.println("\tMemory-Used : 0M");
+ pw.println("\tMemory-Capacity : 0");
+ pw.close();
+ String nodeStatusStr = baos.toString("UTF-8");
verify(sysOut, times(1)).println(isA(String.class));
verify(sysOut).println(nodeStatusStr);
}
@@ -225,4 +254,4 @@ public class TestYarnCLI {
return cli;
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Mon Apr 1 16:47:16 2013
@@ -180,10 +180,13 @@ public class YarnConfiguration extends C
RM_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1;
- /** The maximum number of application master retries.*/
- public static final String RM_AM_MAX_RETRIES =
- RM_PREFIX + "am.max-retries";
- public static final int DEFAULT_RM_AM_MAX_RETRIES = 1;
+ /**
+ * The maximum number of application attempts.
+ * It's a global setting for all application masters.
+ */
+ public static final String RM_AM_MAX_ATTEMPTS =
+ RM_PREFIX + "am.max-attempts";
+ public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 1;
/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =
@@ -304,6 +307,17 @@ public class YarnConfiguration extends C
/** who will execute(launch) the containers.*/
public static final String NM_CONTAINER_EXECUTOR =
NM_PREFIX + "container-executor.class";
+
+ /**
+ * Adjustment to make to the container os scheduling priority.
+ * The valid values for this could vary depending on the platform.
+ * On Linux, higher values mean run the containers at a less
+ * favorable priority than the NM.
+ * The value specified is an int.
+ */
+ public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY =
+ NM_PREFIX + "container-executor.os.sched.priority.adjustment";
+ public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0;
/** Number of threads container manager uses.*/
public static final String NM_CONTAINER_MGR_THREAD_COUNT =
@@ -426,6 +440,16 @@ public class YarnConfiguration extends C
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
+ /** Specifies whether physical memory check is enabled. */
+ public static final String NM_PMEM_CHECK_ENABLED = NM_PREFIX
+ + "pmem-check-enabled";
+ public static final boolean DEFAULT_NM_PMEM_CHECK_ENABLED = true;
+
+ /** Specifies whether physical memory check is enabled. */
+ public static final String NM_VMEM_CHECK_ENABLED = NM_PREFIX
+ + "vmem-check-enabled";
+ public static final boolean DEFAULT_NM_VMEM_CHECK_ENABLED = true;
+
/** Conversion ratio for physical memory to virtual memory. */
public static final String NM_VMEM_PMEM_RATIO =
NM_PREFIX + "vmem-pmem-ratio";
@@ -610,6 +634,20 @@ public class YarnConfiguration extends C
public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
2000;
+ /** Max time to wait to establish a connection to RM when NM starts
+ */
+ public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS =
+ NM_PREFIX + "resourcemanager.connect.wait.secs";
+ public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS =
+ 15*60;
+
+ /** Time interval between each NM attempt to connect to RM
+ */
+ public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS =
+ NM_PREFIX + "resourcemanager.connect.retry_interval.secs";
+ public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+ = 30;
+
/**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java Mon Apr 1 16:47:16 2013
@@ -27,6 +27,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
+import java.io.PrintStream;
import java.io.Writer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -231,7 +232,6 @@ public class AggregatedLogFormat {
out = this.writer.prepareAppendValue(-1);
out.writeInt(VERSION);
out.close();
- this.fsDataOStream.hflush();
}
public void writeApplicationOwner(String user) throws IOException {
@@ -506,7 +506,7 @@ public class AggregatedLogFormat {
* @throws IOException
*/
public static void readAContainerLogsForALogType(
- DataInputStream valueStream, DataOutputStream out)
+ DataInputStream valueStream, PrintStream out)
throws IOException {
byte[] buf = new byte[65535];
@@ -514,11 +514,11 @@ public class AggregatedLogFormat {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
- out.writeUTF("\nLogType:");
- out.writeUTF(fileType);
- out.writeUTF("\nLogLength:");
- out.writeUTF(fileLengthStr);
- out.writeUTF("\nLog Contents:\n");
+ out.print("LogType: ");
+ out.println(fileType);
+ out.print("LogLength: ");
+ out.println(fileLengthStr);
+ out.println("Log Contents:");
int curRead = 0;
long pendingRead = fileLength - curRead;
@@ -534,6 +534,7 @@ public class AggregatedLogFormat {
pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
+ out.println("");
}
public void close() throws IOException {
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java Mon Apr 1 16:47:16 2013
@@ -19,10 +19,10 @@
package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.PrintStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -30,6 +30,7 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileContext;
@@ -57,10 +58,13 @@ public class LogDumper extends Configure
public int run(String[] args) throws Exception {
Options opts = new Options();
- opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId");
- opts.addOption(CONTAINER_ID_OPTION, true, "ContainerId");
- opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress");
- opts.addOption(APP_OWNER_OPTION, true, "AppOwner");
+ opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId (required)");
+ opts.addOption(CONTAINER_ID_OPTION, true,
+ "ContainerId (must be specified if node address is specified)");
+ opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format "
+ + "nodename:port (must be specified if container id is specified)");
+ opts.addOption(APP_OWNER_OPTION, true,
+ "AppOwner (assumed to be current user if not specified)");
if (args.length < 1) {
HelpFormatter formatter = new HelpFormatter();
@@ -99,14 +103,12 @@ public class LogDumper extends Configure
ApplicationId appId =
ConverterUtils.toApplicationId(recordFactory, appIdStr);
- DataOutputStream out = new DataOutputStream(System.out);
-
if (appOwner == null || appOwner.isEmpty()) {
appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
}
int resultCode = 0;
if (containerIdStr == null && nodeAddress == null) {
- resultCode = dumpAllContainersLogs(appId, appOwner, out);
+ resultCode = dumpAllContainersLogs(appId, appOwner, System.out);
} else if ((containerIdStr == null && nodeAddress != null)
|| (containerIdStr != null && nodeAddress == null)) {
System.out.println("ContainerId or NodeAddress cannot be null!");
@@ -125,7 +127,7 @@ public class LogDumper extends Configure
appOwner,
ConverterUtils.toNodeId(nodeAddress),
LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
- resultCode = dumpAContainerLogs(containerIdStr, reader, out);
+ resultCode = dumpAContainerLogs(containerIdStr, reader, System.out);
}
return resultCode;
@@ -149,12 +151,11 @@ public class LogDumper extends Configure
"Log aggregation has not completed or is not enabled.");
return -1;
}
- DataOutputStream out = new DataOutputStream(System.out);
- return dumpAContainerLogs(containerId, reader, out);
+ return dumpAContainerLogs(containerId, reader, System.out);
}
private int dumpAContainerLogs(String containerIdStr,
- AggregatedLogFormat.LogReader reader, DataOutputStream out)
+ AggregatedLogFormat.LogReader reader, PrintStream out)
throws IOException {
DataInputStream valueStream;
LogKey key = new LogKey();
@@ -183,7 +184,7 @@ public class LogDumper extends Configure
}
private int dumpAllContainersLogs(ApplicationId appId, String appOwner,
- DataOutputStream out) throws IOException {
+ PrintStream out) throws IOException {
Path remoteRootLogDir =
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -216,6 +217,9 @@ public class LogDumper extends Configure
valueStream = reader.next(key);
while (valueStream != null) {
+ String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
+ out.println(containerString);
+ out.println(StringUtils.repeat("=", containerString.length()));
while (true) {
try {
LogReader.readAContainerLogsForALogType(valueStream, out);
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Mon Apr 1 16:47:16 2013
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -404,4 +405,21 @@ public class BuilderUtils {
allocateRequest.addAllReleases(containersToBeReleased);
return allocateRequest;
}
+
+ public static AllocateResponse newAllocateResponse(int responseId,
+ List<ContainerStatus> completedContainers,
+ List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+ Resource availResources, boolean reboot, int numClusterNodes) {
+ AllocateResponse response = recordFactory
+ .newRecordInstance(AllocateResponse.class);
+ response.setNumClusterNodes(numClusterNodes);
+ response.setResponseId(responseId);
+ response.setCompletedContainersStatuses(completedContainers);
+ response.setAllocatedContainers(allocatedContainers);
+ response.setUpdatedNodes(updatedNodes);
+ response.setAvailableResources(availResources);
+ response.setReboot(reboot);
+
+ return response;
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java Mon Apr 1 16:47:16 2013
@@ -251,6 +251,12 @@ public class FSDownload implements Calla
}
break;
}
+ if(localrsrc.isFile()){
+ try {
+ files.delete(new Path(localrsrc.toString()), false);
+ } catch (IOException ignore) {
+ }
+ }
return 0;
// TODO Should calculate here before returning
//return FileUtil.getDU(destDir);
@@ -264,41 +270,41 @@ public class FSDownload implements Calla
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
-
Path tmp;
do {
tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
} while (files.util().exists(tmp));
destDirPath = tmp;
-
createDir(destDirPath, cachePerms);
final Path dst_work = new Path(destDirPath + "_tmp");
createDir(dst_work, cachePerms);
-
Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
try {
- Path dTmp = null == userUgi
- ? files.makeQualified(copy(sCopy, dst_work))
- : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
+ Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
+ : userUgi.doAs(new PrivilegedExceptionAction<Path>() {
public Path run() throws Exception {
return files.makeQualified(copy(sCopy, dst_work));
};
});
Pattern pattern = null;
String p = resource.getPattern();
- if(p != null) {
+ if (p != null) {
pattern = Pattern.compile(p);
}
unpack(new File(dTmp.toUri()), new File(dFinal.toUri()), pattern);
changePermissions(dFinal.getFileSystem(conf), dFinal);
files.rename(dst_work, destDirPath, Rename.OVERWRITE);
} catch (Exception e) {
- try { files.delete(destDirPath, true); } catch (IOException ignore) { }
+ try {
+ files.delete(destDirPath, true);
+ } catch (IOException ignore) {
+ }
throw e;
} finally {
try {
files.delete(dst_work, true);
- } catch (FileNotFoundException ignore) { }
+ } catch (FileNotFoundException ignore) {
+ }
// clear ref to internal var
rand = null;
conf = null;
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java Mon Apr 1 16:47:16 2013
@@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
@@ -59,32 +60,30 @@ public class ProcfsBasedProcessTree exte
public static final String PROCFS_STAT_FILE = "stat";
public static final String PROCFS_CMDLINE_FILE = "cmdline";
public static final long PAGE_SIZE;
- static {
- ShellCommandExecutor shellExecutor =
- new ShellCommandExecutor(new String[]{"getconf", "PAGESIZE"});
- long pageSize = -1;
- try {
- shellExecutor.execute();
- pageSize = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
- } catch (IOException e) {
- LOG.error(StringUtils.stringifyException(e));
- } finally {
- PAGE_SIZE = pageSize;
- }
- }
public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
+
static {
- ShellCommandExecutor shellExecutor =
- new ShellCommandExecutor(new String[]{"getconf", "CLK_TCK"});
long jiffiesPerSecond = -1;
+ long pageSize = -1;
try {
- shellExecutor.execute();
- jiffiesPerSecond = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+ if(Shell.LINUX) {
+ ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor(
+ new String[] { "getconf", "CLK_TCK" });
+ shellExecutorClk.execute();
+ jiffiesPerSecond = Long.parseLong(shellExecutorClk.getOutput().replace("\n", ""));
+
+ ShellCommandExecutor shellExecutorPage = new ShellCommandExecutor(
+ new String[] { "getconf", "PAGESIZE" });
+ shellExecutorPage.execute();
+ pageSize = Long.parseLong(shellExecutorPage.getOutput().replace("\n", ""));
+
+ }
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
} finally {
JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
Math.round(1000D / jiffiesPerSecond) : -1;
+ PAGE_SIZE = pageSize;
}
}
@@ -126,8 +125,7 @@ public class ProcfsBasedProcessTree exte
*/
public static boolean isAvailable() {
try {
- String osName = System.getProperty("os.name");
- if (!osName.startsWith("Linux")) {
+ if (!Shell.LINUX) {
LOG.info("ProcfsBasedProcessTree currently is supported only on "
+ "Linux.");
return false;