You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/06/09 11:37:28 UTC

svn commit: r1684362 - /hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java

Author: edwardyoon
Date: Tue Jun  9 09:37:28 2015
New Revision: 1684362

URL: http://svn.apache.org/r1684362
Log:
HAMA-939: Refactoring which was implement using out-of-date status response

Modified:
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java?rev=1684362&r1=1684361&r2=1684362&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java Tue Jun  9 09:37:28 2015
@@ -67,7 +67,7 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class ApplicationMaster  implements BSPClient, BSPPeerProtocol {
+public class ApplicationMaster implements BSPClient, BSPPeerProtocol {
   private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
 
   // Configuration
@@ -117,7 +117,6 @@ public class ApplicationMaster  implemen
   @VisibleForTesting
   protected ApplicationAttemptId appAttemptID;
 
-
   // TODO
   // For status update for clients - yet to be implemented
   // Hostname of the container
@@ -133,7 +132,8 @@ public class ApplicationMaster  implemen
   protected int numTotalContainers;
   // Memory to request for the container on which the shell command will run
   private int containerMemory;
-  // VirtualCores to request for the container on which the shell command will run
+  // VirtualCores to request for the container on which the shell command will
+  // run
   private int containerVirtualCores = 1;
 
   // Priority of the request
@@ -160,8 +160,8 @@ public class ApplicationMaster  implemen
   private List<Thread> launchThreads = new ArrayList<Thread>();
 
   @VisibleForTesting
-  protected final Set<ContainerId> launchedContainers =
-      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+  protected final Set<ContainerId> launchedContainers = Collections
+      .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
 
   public ApplicationMaster() {
     // Set up the configuration
@@ -171,7 +171,7 @@ public class ApplicationMaster  implemen
   public static void main(String[] args) throws IOException {
     boolean result = false;
     ApplicationMaster appMaster = new ApplicationMaster();
-    
+
     try {
       LOG.info("Initializing ApplicationMaster");
       boolean doRun = appMaster.init(args);
@@ -188,7 +188,7 @@ public class ApplicationMaster  implemen
       LOG.info("Stop SyncServer and RPCServer.");
       appMaster.close();
     }
-    
+
     if (result) {
       LOG.info("Application Master completed successfully. exiting");
       System.exit(0);
@@ -254,7 +254,7 @@ public class ApplicationMaster  implemen
 
   /**
    * Main run function for the application master
-   *
+   * 
    * @throws org.apache.hadoop.yarn.exceptions.YarnException
    * @throws IOException
    */
@@ -264,8 +264,8 @@ public class ApplicationMaster  implemen
 
     // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
     // are marked as LimitedPrivate
-    Credentials credentials =
-        UserGroupInformation.getCurrentUser().getCredentials();
+    Credentials credentials = UserGroupInformation.getCurrentUser()
+        .getCredentials();
     DataOutputBuffer dob = new DataOutputBuffer();
     credentials.writeTokenStorageToStream(dob);
     // Now remove the AM->RM token so that containers cannot access it.
@@ -281,13 +281,12 @@ public class ApplicationMaster  implemen
     allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 
     // Create appSubmitterUgi and add original tokens to it
-    String appSubmitterUserName =
-        System.getenv(ApplicationConstants.Environment.USER.name());
-    appSubmitterUgi =
-        UserGroupInformation.createRemoteUser(appSubmitterUserName);
+    String appSubmitterUserName = System
+        .getenv(ApplicationConstants.Environment.USER.name());
+    appSubmitterUgi = UserGroupInformation
+        .createRemoteUser(appSubmitterUserName);
     appSubmitterUgi.addCredentials(credentials);
 
-
     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
     amRMClient.init(localConf);
@@ -328,23 +327,24 @@ public class ApplicationMaster  implemen
 
     if (containerVirtualCores > maxVCores) {
       LOG.info("Container virtual cores specified above max threshold of cluster."
-          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
-          + maxVCores);
+          + " Using max value."
+          + ", specified="
+          + containerVirtualCores
+          + ", max=" + maxVCores);
       containerVirtualCores = maxVCores;
     }
 
-    List<Container> previousAMRunningContainers =
-        response.getContainersFromPreviousAttempts();
+    List<Container> previousAMRunningContainers = response
+        .getContainersFromPreviousAttempts();
     LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
         + " previous attempts' running containers on AM registration.");
-    for(Container container: previousAMRunningContainers) {
+    for (Container container : previousAMRunningContainers) {
       launchedContainers.add(container.getId());
     }
     numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
 
-
-    int numTotalContainersToRequest =
-        numTotalContainers - previousAMRunningContainers.size();
+    int numTotalContainersToRequest = numTotalContainers
+        - previousAMRunningContainers.size();
     // Setup ask for containers from RM
     // Send request for containers to RM
     // Until we get our fully allocated quota, we keep on polling RM for
@@ -366,11 +366,11 @@ public class ApplicationMaster  implemen
   @VisibleForTesting
   protected boolean finish() {
     // wait for completion.
-    while (!done
-        && (numCompletedContainers.get() != numTotalContainers)) {
+    while (!done && (numCompletedContainers.get() != numTotalContainers)) {
       try {
         Thread.sleep(200);
-      } catch (InterruptedException ex) {}
+      } catch (InterruptedException ex) {
+      }
     }
 
     // Join all launched threads
@@ -396,8 +396,8 @@ public class ApplicationMaster  implemen
     FinalApplicationStatus appStatus;
     String appMessage = null;
     boolean success = true;
-    if (numFailedContainers.get() == 0 &&
-        numCompletedContainers.get() == numTotalContainers) {
+    if (numFailedContainers.get() == 0
+        && numCompletedContainers.get() == numTotalContainers) {
       appStatus = FinalApplicationStatus.SUCCEEDED;
     } else {
       appStatus = FinalApplicationStatus.FAILED;
@@ -504,7 +504,7 @@ public class ApplicationMaster  implemen
             + ", containerResourceVirtualCores"
             + allocatedContainer.getResource().getVirtualCores());
 
-        Thread launchThread = createLaunchContainerThread(allocatedContainer, allocatedContainer.getId().getContainerId());
+        Thread launchThread = createLaunchContainerThread(allocatedContainer);
 
         // launch and start the container on a separate thread to keep
         // the main thread unblocked
@@ -541,11 +541,9 @@ public class ApplicationMaster  implemen
   }
 
   @VisibleForTesting
-  static class NMCallbackHandler
-      implements NMClientAsync.CallbackHandler {
+  static class NMCallbackHandler implements NMClientAsync.CallbackHandler {
 
-    private ConcurrentMap<ContainerId, Container> containers =
-        new ConcurrentHashMap<ContainerId, Container>();
+    private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
     private final ApplicationMaster applicationMaster;
 
     public NMCallbackHandler(ApplicationMaster applicationMaster) {
@@ -568,8 +566,8 @@ public class ApplicationMaster  implemen
     public void onContainerStatusReceived(ContainerId containerId,
         ContainerStatus containerStatus) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Container Status: id=" + containerId + ", status=" +
-            containerStatus);
+        LOG.debug("Container Status: id=" + containerId + ", status="
+            + containerStatus);
       }
     }
 
@@ -581,7 +579,8 @@ public class ApplicationMaster  implemen
       }
       Container container = containers.get(containerId);
       if (container != null) {
-        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId,
+            container.getNodeId());
       }
     }
 
@@ -594,8 +593,7 @@ public class ApplicationMaster  implemen
     }
 
     @Override
-    public void onGetContainerStatusError(
-        ContainerId containerId, Throwable t) {
+    public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
       LOG.error("Failed to query the status of Container " + containerId);
     }
 
@@ -607,8 +605,8 @@ public class ApplicationMaster  implemen
   }
 
   /**
-   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
-   * that will execute the shell command.
+   * Thread to connect to the {@link ContainerManagementProtocol} and launch the
+   * container that will execute the shell command.
    */
   private class LaunchContainerRunnable implements Runnable {
 
@@ -619,25 +617,20 @@ public class ApplicationMaster  implemen
 
     Configuration conf;
 
-    long taskAttemptId;
-
     /**
-     * @param lcontainer        Allocated container
+     * @param lcontainer Allocated container
      * @param containerListener Callback handler of the container
      */
-    public LaunchContainerRunnable(
-        Container lcontainer, NMCallbackHandler containerListener,
-        Configuration conf, long taskAttemptId) {
+    public LaunchContainerRunnable(Container lcontainer,
+        NMCallbackHandler containerListener, Configuration conf) {
       this.container = lcontainer;
       this.containerListener = containerListener;
       this.conf = conf;
-      this.taskAttemptId = taskAttemptId;
     }
 
     /**
-     * Connects to CM, sets up container launch context
-     * for shell command and eventually dispatches the container
-     * start request to the CM.
+     * Connects to CM, sets up container launch context for shell command and
+     * eventually dispatches the container start request to the CM.
      */
     @Override
     public void run() {
@@ -656,11 +649,12 @@ public class ApplicationMaster  implemen
       } catch (IOException e) {
         e.printStackTrace();
       }
-      Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
+      Path packageFile = new Path(
+          System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
       URL packageUrl = null;
       try {
         packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
-          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+            .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
         LOG.info("PackageURL has been composed to " + packageUrl.toString());
         LOG.info("Reverting packageURL to path: "
             + ConverterUtils.getPathFromYarnURL(packageUrl));
@@ -672,25 +666,27 @@ public class ApplicationMaster  implemen
       }
 
       packageResource.setResource(packageUrl);
-      packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
-      packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
+      packageResource.setSize(Long.parseLong(System
+          .getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
+      packageResource.setTimestamp(Long.parseLong(System
+          .getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
       packageResource.setType(LocalResourceType.FILE);
       packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
 
       localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
 
-      Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
+      Path hamaReleaseFile = new Path(
+          System.getenv(YARNBSPConstants.HAMA_LOCATION));
       URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
           .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
-      LOG.info("Hama release URL has been composed to " + hamaReleaseUrl
-          .toString());
+      LOG.info("Hama release URL has been composed to "
+          + hamaReleaseUrl.toString());
 
       RemoteIterator<LocatedFileStatus> fileStatusListIterator = null;
       try {
-        fileStatusListIterator = fs.listFiles(
-            hamaReleaseFile, true);
+        fileStatusListIterator = fs.listFiles(hamaReleaseFile, true);
 
-        while(fileStatusListIterator.hasNext()) {
+        while (fileStatusListIterator.hasNext()) {
           LocatedFileStatus lfs = fileStatusListIterator.next();
           LocalResource localRsrc = LocalResource.newInstance(
               ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
@@ -704,16 +700,16 @@ public class ApplicationMaster  implemen
 
       ctx.setLocalResources(localResources);
 
-    /*
-     * TODO Package classpath seems not to work if you're in pseudo distributed
-     * mode, because the resource must not be moved, it will never be unpacked.
-     * So we will check if our jar file has the file:// prefix and put it into
-     * the CP directly
-     */
+      /*
+       * TODO Package classpath seems not to work if you're in pseudo
+       * distributed mode, because the resource must not be moved, it will never
+       * be unpacked. So we will check if our jar file has the file:// prefix
+       * and put it into the CP directly
+       */
 
       StringBuilder classPathEnv = new StringBuilder(
-          ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
-          .append("./*");
+          ApplicationConstants.Environment.CLASSPATH.$()).append(
+          File.pathSeparatorChar).append("./*");
       for (String c : conf.getStrings(
           YarnConfiguration.YARN_APPLICATION_CLASSPATH,
           YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
@@ -727,13 +723,14 @@ public class ApplicationMaster  implemen
       vargs.add(BSPRunner.class.getCanonicalName());
 
       vargs.add(jobId.getJtIdentifier());
-      vargs.add(Long.toString(taskAttemptId));
-      vargs.add(
-          new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
-              .toString());
-
-      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stdout");
-      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stderr");
+      vargs.add(Long.toString(container.getId().getContainerId()));
+      vargs.add(new Path(jobFile).makeQualified(fs.getUri(),
+          fs.getWorkingDirectory()).toString());
+
+      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+          + "/hama-worker.stdout");
+      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+          + "/hama-worker.stderr");
 
       // Get final commmand
       StringBuilder command = new StringBuilder();
@@ -755,7 +752,7 @@ public class ApplicationMaster  implemen
 
   /**
    * Setup the request that will be sent to the RM for the container ask.
-   *
+   * 
    * @return the setup ResourceRequest to be sent to RM
    */
   private AMRMClient.ContainerRequest setupContainerAskForRM() {
@@ -766,12 +763,13 @@ public class ApplicationMaster  implemen
     Priority pri = Priority.newInstance(requestPriority);
 
     // Set up resource type requirements
-    // For now, memory and CPU are supported so we set memory and cpu requirements
+    // For now, memory and CPU are supported so we set memory and cpu
+    // requirements
     Resource capability = Resource.newInstance(containerMemory,
         containerVirtualCores);
 
-    AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, null, null,
-        pri);
+    AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(
+        capability, null, null, pri);
     LOG.info("Requested container ask: " + request.toString());
     return request;
   }
@@ -786,7 +784,7 @@ public class ApplicationMaster  implemen
 
     FileSystem fs = FileSystem.get(URI.create(path), jobConf);
 
-    InputStream in =fs.open(jobSubmitPath);
+    InputStream in = fs.open(jobSubmitPath);
     jobConf.addResource(in);
 
     return jobConf;
@@ -795,7 +793,7 @@ public class ApplicationMaster  implemen
   /**
    * Gets the application attempt ID from the environment. This should be set by
    * YARN when the container has been launched.
-   *
+   * 
    * @return a new ApplicationAttemptId which is unique and identifies this
    *         task.
    */
@@ -816,7 +814,7 @@ public class ApplicationMaster  implemen
    * This method starts the sync server on a specific port and waits for it to
    * come up. Be aware that this method adds the "bsp.sync.server.address" that
    * is needed for a task to connect to the service.
-   *
+   * 
    * @throws IOException
    */
   private void startSyncServer() throws Exception {
@@ -828,8 +826,8 @@ public class ApplicationMaster  implemen
   }
 
   /**
-   * This method is to run Zookeeper in order to coordinates between BSPMaster and Groomservers
-   * using Runnable interface in java.
+   * This method is to run Zookeeper in order to coordinates between BSPMaster
+   * and Groomservers using Runnable interface in java.
    */
   private static class ZKServerThread implements Runnable {
     SyncServer server;
@@ -852,12 +850,13 @@ public class ApplicationMaster  implemen
    * This method starts the needed RPC servers: client server and the task
    * server. This method manipulates the configuration and therefore needs to be
    * executed BEFORE the submitconfiguration gets rewritten.
-   *
+   * 
    * @throws IOException
    */
   private void startRPCServers() throws IOException {
     // start the RPC server which talks to the client
-    this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
+    this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort,
+        jobConf);
     this.clientServer.start();
 
     // start the RPC server which talks to the tasks
@@ -886,6 +885,7 @@ public class ApplicationMaster  implemen
 
   /**
    * Get container memory from "bsp.child.mem.in.mb" set on Hama configuration
+   * 
    * @return The memory of container.
    */
   private int getMemoryRequirements(Configuration conf) {
@@ -907,8 +907,7 @@ public class ApplicationMaster  implemen
     }
 
     if (!opts.contains("-Xmx")) {
-      LOG.info(
-          "No \"-Xmx\" option found in child opts, using default amount of memory!");
+      LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
       return DEFAULT_MEMORY_MB;
     } else {
       // e.G: -Xmx512m
@@ -916,8 +915,8 @@ public class ApplicationMaster  implemen
       int startIndex = opts.indexOf("-Xmx") + 4;
       String xmxString = opts.substring(startIndex);
       char qualifier = xmxString.charAt(xmxString.length() - 1);
-      int memory = Integer
-          .valueOf(xmxString.substring(0, xmxString.length() - 1));
+      int memory = Integer.valueOf(xmxString.substring(0,
+          xmxString.length() - 1));
       if (qualifier == 'm') {
         return memory;
       } else if (qualifier == 'g') {
@@ -931,9 +930,9 @@ public class ApplicationMaster  implemen
   }
 
   @VisibleForTesting
-  Thread createLaunchContainerThread(Container allocatedContainer, long taskAttemptId) {
-    LaunchContainerRunnable runnableLaunchContainer =
-        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf, taskAttemptId);
+  Thread createLaunchContainerThread(Container allocatedContainer) {
+    LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+        allocatedContainer, containerListener, jobConf);
     return new Thread(runnableLaunchContainer);
   }
 
@@ -946,7 +945,7 @@ public class ApplicationMaster  implemen
   public Task getTask(TaskAttemptID taskid) throws IOException {
     BSPJobClient.RawSplit assignedSplit = null;
     String splitName = NullInputFormat.NullInputSplit.class.getName();
-    //String splitName = NullInputSplit.class.getCanonicalName();
+    // String splitName = NullInputSplit.class.getCanonicalName();
     if (splits != null) {
       assignedSplit = splits[taskid.id];
       splitName = assignedSplit.getClassName();
@@ -1003,6 +1002,7 @@ public class ApplicationMaster  implemen
     this.clientServer.stop();
     this.taskServer.stop();
     this.syncServer.stopServer();
+    threadPool.shutdown();
   }
 
   @Override