You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/15 01:19:10 UTC
svn commit: r1493281 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
hadoop-yarn/hadoop-yar...
Author: vinodkv
Date: Fri Jun 14 23:19:09 2013
New Revision: 1493281
URL: http://svn.apache.org/r1493281
Log:
YARN-639. Modified Distributed Shell application to start using the new NMClient library. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1493280 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1493281&r1=1493280&r2=1493281&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Jun 14 23:19:09 2013
@@ -329,6 +329,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-789. Enable zero capabilities resource requests in fair scheduler.
(tucu)
+ YARN-639. Modified Distributed Shell application to start using the new
+ NMClient library. (Zhijie Shen via vinodkv)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1493281&r1=1493280&r2=1493281&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Fri Jun 14 23:19:09 2013
@@ -21,15 +21,16 @@ package org.apache.hadoop.yarn.applicati
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
-import java.security.PrivilegedAction;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
@@ -42,9 +43,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -71,12 +69,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -147,11 +143,15 @@ public class ApplicationMaster {
// Configuration
private Configuration conf;
- // YARN RPC to communicate with the Resource Manager or Node Manager
- private YarnRPC rpc;
// Handle to communicate with the Resource Manager
- private AMRMClientAsync<ContainerRequest> resourceManager;
+ @SuppressWarnings("rawtypes")
+ private AMRMClientAsync resourceManager;
+
+ // Handle to communicate with the Node Manager
+ private NMClientAsync nmClientAsync;
+ // Listen to process the response from the Node Manager
+ private NMCallbackHandler containerListener;
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
@@ -273,7 +273,6 @@ public class ApplicationMaster {
public ApplicationMaster() throws Exception {
// Set up the configuration and RPC
conf = new YarnConfiguration();
- rpc = YarnRPC.create(conf);
}
/**
@@ -437,17 +436,20 @@ public class ApplicationMaster {
* @throws YarnException
* @throws IOException
*/
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public boolean run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
-
- resourceManager = new AMRMClientAsync<ContainerRequest>(appAttemptID,
- 1000,
- allocListener);
+ resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
+ containerListener = new NMCallbackHandler();
+ nmClientAsync = new NMClientAsync(containerListener);
+ nmClientAsync.init(conf);
+ nmClientAsync.start();
+
// Setup local RPC Server to accept status requests directly from clients
// TODO need to setup a protocol for client to be able to communicate to
// the RPC server
@@ -517,6 +519,10 @@ public class ApplicationMaster {
}
}
+ // When the application completes, it should stop all running containers
+ LOG.info("Application completed. Stopping running containers");
+ nmClientAsync.stop();
+
// When the application completes, it should send a finish application
// signal to the RM
LOG.info("Application completed. Signalling finish to RM");
@@ -548,6 +554,7 @@ public class ApplicationMaster {
}
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ @SuppressWarnings("unchecked")
@Override
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Got response from RM for container ask, completedCnt="
@@ -618,8 +625,8 @@ public class ApplicationMaster {
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString());
- LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
- allocatedContainer);
+ LaunchContainerRunnable runnableLaunchContainer =
+ new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep
@@ -652,6 +659,64 @@ public class ApplicationMaster {
}
}
+ private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+
+ private ConcurrentMap<ContainerId, Container> containers =
+ new ConcurrentHashMap<ContainerId, Container>();
+
+ public void addContainer(ContainerId containerId, Container container) {
+ containers.putIfAbsent(containerId, container);
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to stop Container " + containerId);
+ }
+ containers.remove(containerId);
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container Status: id=" + containerId + ", status=" +
+ containerStatus);
+ }
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to start Container " + containerId);
+ }
+ Container container = containers.get(containerId);
+ if (container != null) {
+ nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
+ container.getContainerToken());
+ }
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to start Container " + containerId);
+ containers.remove(containerId);
+ }
+
+ @Override
+ public void onGetContainerStatusError(
+ ContainerId containerId, Throwable t) {
+ LOG.error("Failed to query the status of Container " + containerId);
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to stop Container " + containerId);
+ containers.remove(containerId);
+ }
+ }
+
/**
* Thread to connect to the {@link ContainerManager} and launch the container
* that will execute the shell command.
@@ -660,40 +725,17 @@ public class ApplicationMaster {
// Allocated container
Container container;
- // Handle to communicate with ContainerManager
- ContainerManager cm;
+
+ NMCallbackHandler containerListener;
/**
* @param lcontainer Allocated container
+ * @param containerListener Callback handler of the container
*/
- public LaunchContainerRunnable(Container lcontainer) {
+ public LaunchContainerRunnable(
+ Container lcontainer, NMCallbackHandler containerListener) {
this.container = lcontainer;
- }
-
- /**
- * Helper function to connect to CM
- */
- private void connectToCM() {
- LOG.debug("Connecting to ContainerManager for containerid="
- + container.getId());
- String cmIpPortStr = container.getNodeId().getHost() + ":"
- + container.getNodeId().getPort();
- final InetSocketAddress cmAddress =
- NetUtils.createSocketAddr(cmIpPortStr);
- LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(container.getId().toString());
- Token<ContainerTokenIdentifier> token =
- ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
- cmAddress);
- ugi.addToken(token);
- this.cm = ugi.doAs(new PrivilegedAction<ContainerManager>() {
- @Override
- public ContainerManager run() {
- return ((ContainerManager) rpc.getProxy(ContainerManager.class,
- cmAddress, conf));
- }
- });
+ this.containerListener = containerListener;
}
@Override
@@ -703,9 +745,6 @@ public class ApplicationMaster {
* start request to the CM.
*/
public void run() {
- // Connect to ContainerManager
- connectToCM();
-
LOG.info("Setting up container launch container for containerid="
+ container.getId());
ContainerLaunchContext ctx = Records
@@ -773,40 +812,8 @@ public class ApplicationMaster {
commands.add(command.toString());
ctx.setCommands(commands);
- StartContainerRequest startReq = Records
- .newRecord(StartContainerRequest.class);
- startReq.setContainerLaunchContext(ctx);
- startReq.setContainerToken(container.getContainerToken());
- try {
- cm.startContainer(startReq);
- } catch (YarnException e) {
- LOG.info("Start container failed for :" + ", containerId="
- + container.getId());
- e.printStackTrace();
- // TODO do we need to release this container?
- } catch (IOException e) {
- LOG.info("Start container failed for :" + ", containerId="
- + container.getId());
- e.printStackTrace();
- }
-
- // Get container status?
- // Left commented out as the shell scripts are short lived
- // and we are relying on the status for completed containers
- // from RM to detect status
-
- // GetContainerStatusRequest statusReq =
- // Records.newRecord(GetContainerStatusRequest.class);
- // statusReq.setContainerId(container.getId());
- // GetContainerStatusResponse statusResp;
- // try {
- // statusResp = cm.getContainerStatus(statusReq);
- // LOG.info("Container Status"
- // + ", id=" + container.getId()
- // + ", status=" +statusResp.getStatus());
- // } catch (YarnException e) {
- // e.printStackTrace();
- // }
+ containerListener.addContainer(container.getId(), container);
+ nmClientAsync.startContainer(container, ctx);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1493281&r1=1493280&r2=1493281&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Fri Jun 14 23:19:09 2013
@@ -123,21 +123,76 @@ public class TestDistributedShell {
}
@Test(timeout=30000)
- public void testDSShellWithNoArgs() throws Exception {
-
- String[] args = {};
+ public void testDSShellWithInvalidArgs() throws Exception {
+ Client client = new Client(new Configuration(yarnCluster.getConfig()));
LOG.info("Initializing DS Client with no args");
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- boolean exceptionThrown = false;
try {
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
+ client.init(new String[]{});
+ Assert.fail("Exception is expected");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue("The throw exception is not expected",
+ e.getMessage().contains("No args"));
+ }
+
+ LOG.info("Initializing DS Client with no jar file");
+ try {
+ String[] args = {
+ "--num_containers",
+ "2",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ };
+ client.init(args);
+ Assert.fail("Exception is expected");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue("The throw exception is not expected",
+ e.getMessage().contains("No jar"));
}
- catch (IllegalArgumentException e) {
- exceptionThrown = true;
+
+ LOG.info("Initializing DS Client with no shell command");
+ try {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "2",
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ };
+ client.init(args);
+ Assert.fail("Exception is expected");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue("The throw exception is not expected",
+ e.getMessage().contains("No shell command"));
+ }
+
+ LOG.info("Initializing DS Client with invalid no. of containers");
+ try {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "-1",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ };
+ client.init(args);
+ Assert.fail("Exception is expected");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue("The throw exception is not expected",
+ e.getMessage().contains("Invalid no. of containers"));
}
- Assert.assertTrue(exceptionThrown);
}
}