You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/15 01:18:34 UTC

svn commit: r1493280 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yarn-applicatio...

Author: vinodkv
Date: Fri Jun 14 23:18:34 2013
New Revision: 1493280

URL: http://svn.apache.org/r1493280
Log:
YARN-639. Modified Distributed Shell application to start using the new NMClient library. Contributed by Zhijie Shen.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1493280&r1=1493279&r2=1493280&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Jun 14 23:18:34 2013
@@ -349,6 +349,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-789. Enable zero capabilities resource requests in fair scheduler. 
     (tucu)
 
+    YARN-639. Modified Distributed Shell application to start using the new
+    NMClient library. (Zhijie Shen via vinodkv)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1493280&r1=1493279&r2=1493280&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Fri Jun 14 23:18:34 2013
@@ -21,15 +21,16 @@ package org.apache.hadoop.yarn.applicati
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.security.PrivilegedAction;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.CommandLine;
@@ -42,9 +43,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -71,12 +69,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -147,11 +143,15 @@ public class ApplicationMaster {
 
   // Configuration
   private Configuration conf;
-  // YARN RPC to communicate with the Resource Manager or Node Manager
-  private YarnRPC rpc;
 
   // Handle to communicate with the Resource Manager
-  private AMRMClientAsync<ContainerRequest> resourceManager;
+  @SuppressWarnings("rawtypes")
+  private AMRMClientAsync resourceManager;
+
+  // Handle to communicate with the Node Manager
+  private NMClientAsync nmClientAsync;
+  // Listen to process the response from the Node Manager
+  private NMCallbackHandler containerListener;
   
   // Application Attempt Id ( combination of attemptId and fail count )
   private ApplicationAttemptId appAttemptID;
@@ -273,7 +273,6 @@ public class ApplicationMaster {
   public ApplicationMaster() throws Exception {
     // Set up the configuration and RPC
     conf = new YarnConfiguration();
-    rpc = YarnRPC.create(conf);
   }
 
   /**
@@ -437,17 +436,20 @@ public class ApplicationMaster {
    * @throws YarnException
    * @throws IOException
    */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   public boolean run() throws YarnException, IOException {
     LOG.info("Starting ApplicationMaster");
 
     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
-    
-    resourceManager = new AMRMClientAsync<ContainerRequest>(appAttemptID, 
-                                                            1000, 
-                                                            allocListener);
+    resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
     resourceManager.init(conf);
     resourceManager.start();
 
+    containerListener = new NMCallbackHandler();
+    nmClientAsync = new NMClientAsync(containerListener);
+    nmClientAsync.init(conf);
+    nmClientAsync.start();
+
     // Setup local RPC Server to accept status requests directly from clients
     // TODO need to setup a protocol for client to be able to communicate to
     // the RPC server
@@ -517,6 +519,10 @@ public class ApplicationMaster {
       }
     }
 
+    // When the application completes, it should stop all running containers
+    LOG.info("Application completed. Stopping running containers");
+    nmClientAsync.stop();
+
     // When the application completes, it should send a finish application
     // signal to the RM
     LOG.info("Application completed. Signalling finish to RM");
@@ -548,6 +554,7 @@ public class ApplicationMaster {
   }
   
   private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @SuppressWarnings("unchecked")
     @Override
     public void onContainersCompleted(List<ContainerStatus> completedContainers) {
       LOG.info("Got response from RM for container ask, completedCnt="
@@ -618,8 +625,8 @@ public class ApplicationMaster {
         // + ", containerToken"
         // +allocatedContainer.getContainerToken().getIdentifier().toString());
 
-        LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
-            allocatedContainer);
+        LaunchContainerRunnable runnableLaunchContainer =
+            new LaunchContainerRunnable(allocatedContainer, containerListener);
         Thread launchThread = new Thread(runnableLaunchContainer);
 
         // launch and start the container on a separate thread to keep
@@ -652,6 +659,64 @@ public class ApplicationMaster {
     }
   }
 
+  private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+
+    private ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<ContainerId, Container>();
+
+    public void addContainer(ContainerId containerId, Container container) {
+      containers.putIfAbsent(containerId, container);
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to stop Container " + containerId);
+      }
+      containers.remove(containerId);
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Container Status: id=" + containerId + ", status=" +
+            containerStatus);
+      }
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Succeeded to start Container " + containerId);
+      }
+      Container container = containers.get(containerId);
+      if (container != null) {
+        nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
+            container.getContainerToken());
+      }
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to start Container " + containerId);
+      containers.remove(containerId);
+    }
+
+    @Override
+    public void onGetContainerStatusError(
+        ContainerId containerId, Throwable t) {
+      LOG.error("Failed to query the status of Container " + containerId);
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      LOG.error("Failed to stop Container " + containerId);
+      containers.remove(containerId);
+    }
+  }
+
   /**
    * Thread to connect to the {@link ContainerManager} and launch the container
    * that will execute the shell command.
@@ -660,40 +725,17 @@ public class ApplicationMaster {
 
     // Allocated container
     Container container;
-    // Handle to communicate with ContainerManager
-    ContainerManager cm;
+
+    NMCallbackHandler containerListener;
 
     /**
      * @param lcontainer Allocated container
+     * @param containerListener Callback handler of the container
      */
-    public LaunchContainerRunnable(Container lcontainer) {
+    public LaunchContainerRunnable(
+        Container lcontainer, NMCallbackHandler containerListener) {
       this.container = lcontainer;
-    }
-
-    /**
-     * Helper function to connect to CM
-     */
-    private void connectToCM() {
-      LOG.debug("Connecting to ContainerManager for containerid="
-          + container.getId());
-      String cmIpPortStr = container.getNodeId().getHost() + ":"
-          + container.getNodeId().getPort();
-      final InetSocketAddress cmAddress =
-          NetUtils.createSocketAddr(cmIpPortStr);
-      LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
-      UserGroupInformation ugi =
-          UserGroupInformation.createRemoteUser(container.getId().toString());
-      Token<ContainerTokenIdentifier> token =
-          ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
-            cmAddress);
-      ugi.addToken(token);
-      this.cm = ugi.doAs(new PrivilegedAction<ContainerManager>() {
-        @Override
-        public ContainerManager run() {
-          return ((ContainerManager) rpc.getProxy(ContainerManager.class,
-            cmAddress, conf));
-        }
-      });
+      this.containerListener = containerListener;
     }
 
     @Override
@@ -703,9 +745,6 @@ public class ApplicationMaster {
      * start request to the CM. 
      */
     public void run() {
-      // Connect to ContainerManager
-      connectToCM();
-
       LOG.info("Setting up container launch container for containerid="
           + container.getId());
       ContainerLaunchContext ctx = Records
@@ -773,40 +812,8 @@ public class ApplicationMaster {
       commands.add(command.toString());
       ctx.setCommands(commands);
 
-      StartContainerRequest startReq = Records
-          .newRecord(StartContainerRequest.class);
-      startReq.setContainerLaunchContext(ctx);
-      startReq.setContainerToken(container.getContainerToken());
-      try {
-        cm.startContainer(startReq);
-      } catch (YarnException e) {
-        LOG.info("Start container failed for :" + ", containerId="
-            + container.getId());
-        e.printStackTrace();
-        // TODO do we need to release this container?
-      } catch (IOException e) {
-        LOG.info("Start container failed for :" + ", containerId="
-            + container.getId());
-        e.printStackTrace();
-      }
-
-      // Get container status?
-      // Left commented out as the shell scripts are short lived
-      // and we are relying on the status for completed containers
-      // from RM to detect status
-
-      // GetContainerStatusRequest statusReq =
-      // Records.newRecord(GetContainerStatusRequest.class);
-      // statusReq.setContainerId(container.getId());
-      // GetContainerStatusResponse statusResp;
-      // try {
-      // statusResp = cm.getContainerStatus(statusReq);
-      // LOG.info("Container Status"
-      // + ", id=" + container.getId()
-      // + ", status=" +statusResp.getStatus());
-      // } catch (YarnException e) {
-      // e.printStackTrace();
-      // }
+      containerListener.addContainer(container.getId(), container);
+      nmClientAsync.startContainer(container, ctx);
     }
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1493280&r1=1493279&r2=1493280&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Fri Jun 14 23:18:34 2013
@@ -123,21 +123,76 @@ public class TestDistributedShell {
   }
 
   @Test(timeout=30000)
-  public void testDSShellWithNoArgs() throws Exception {
-
-    String[] args = {};
+  public void testDSShellWithInvalidArgs() throws Exception {
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
 
     LOG.info("Initializing DS Client with no args");
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    boolean exceptionThrown = false;
     try {
-      boolean initSuccess = client.init(args);
-      Assert.assertTrue(initSuccess);
+      client.init(new String[]{});
+      Assert.fail("Exception is expected");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue("The throw exception is not expected",
+          e.getMessage().contains("No args"));
+    }
+
+    LOG.info("Initializing DS Client with no jar file");
+    try {
+      String[] args = {
+          "--num_containers",
+          "2",
+          "--shell_command",
+          Shell.WINDOWS ? "dir" : "ls",
+          "--master_memory",
+          "512",
+          "--container_memory",
+          "128"
+      };
+      client.init(args);
+      Assert.fail("Exception is expected");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue("The throw exception is not expected",
+          e.getMessage().contains("No jar"));
     }
-    catch (IllegalArgumentException e) {
-      exceptionThrown = true;
+
+    LOG.info("Initializing DS Client with no shell command");
+    try {
+      String[] args = {
+          "--jar",
+          APPMASTER_JAR,
+          "--num_containers",
+          "2",
+          "--master_memory",
+          "512",
+          "--container_memory",
+          "128"
+      };
+      client.init(args);
+      Assert.fail("Exception is expected");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue("The throw exception is not expected",
+          e.getMessage().contains("No shell command"));
+    }
+
+    LOG.info("Initializing DS Client with invalid no. of containers");
+    try {
+      String[] args = {
+          "--jar",
+          APPMASTER_JAR,
+          "--num_containers",
+          "-1",
+          "--shell_command",
+          Shell.WINDOWS ? "dir" : "ls",
+          "--master_memory",
+          "512",
+          "--container_memory",
+          "128"
+      };
+      client.init(args);
+      Assert.fail("Exception is expected");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue("The throw exception is not expected",
+          e.getMessage().contains("Invalid no. of containers"));
     }
-    Assert.assertTrue(exceptionThrown);
   }
 
 }