You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/21 00:14:05 UTC

svn commit: r1221517 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/

Author: vinodkv
Date: Tue Dec 20 23:14:05 2011
New Revision: 1221517

URL: http://svn.apache.org/viewvc?rev=1221517&view=rev
Log:
MAPREDUCE-3391. Making a trivial change to correct a log message in DistributedShell app's AM. Contributed by Subroto Sanyal.
svn merge -c 1221516 --ignore-ancestry ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1221517&r1=1221516&r2=1221517&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Dec 20 23:14:05 2011
@@ -84,6 +84,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3518. mapred queue -info <queue> -showJobs throws NPE. 
     (Jonathan Eagles via mahadev)
 
+    MAPREDUCE-3391. Making a trivial change to correct a log message in
+    DistributedShell app's AM. (Subroto Sanyal via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1221517&r1=1221516&r2=1221517&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Dec 20 23:14:05 2011
@@ -184,7 +184,7 @@ public class ApplicationMaster {
   private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
 
   // Launch threads
-  private List<Thread> launchThreads = new ArrayList<Thread>();	
+  private List<Thread> launchThreads = new ArrayList<Thread>();
 
   /**
    * @param args Command line args
@@ -194,7 +194,7 @@ public class ApplicationMaster {
     try {
       ApplicationMaster appMaster = new ApplicationMaster();
       LOG.info("Initializing ApplicationMaster");
-      boolean doRun = appMaster.init(args);	
+      boolean doRun = appMaster.init(args);
       if (!doRun) {
         System.exit(0);
       }
@@ -202,14 +202,14 @@ public class ApplicationMaster {
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       System.exit(1);
-    }		
+    }
     if (result) {
       LOG.info("Application Master completed successfully. exiting");
       System.exit(0);
     }
     else {
       LOG.info("Application Master failed. exiting");
-      System.exit(2);			
+      System.exit(2);
     }
   }
 
@@ -218,7 +218,7 @@ public class ApplicationMaster {
    */
   private void dumpOutDebugInfo() {
 
-    LOG.info("Dump debug output");		
+    LOG.info("Dump debug output");
     Map<String, String> envs = System.getenv();
     for (Map.Entry<String, String> env : envs.entrySet()) {
       LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
@@ -277,7 +277,7 @@ public class ApplicationMaster {
     if (args.length == 0) {
       printUsage(opts);
       throw new IllegalArgumentException("No args specified for application master to initialize");
-    }		
+    }
 
     if (cliParser.hasOption("help")) {
       printUsage(opts);
@@ -297,8 +297,8 @@ public class ApplicationMaster {
         appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
       } 
       else {
-        throw new IllegalArgumentException("Application Attempt Id not set in the environment");				
-      }	
+        throw new IllegalArgumentException("Application Attempt Id not set in the environment");
+      }
     } else {
       ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
       appAttemptID = containerId.getApplicationAttemptId();
@@ -338,11 +338,11 @@ public class ApplicationMaster {
     if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
       shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
 
-      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {				
-        shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));				
-      }					
+      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
+        shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
+      }
       if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
-        shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));				
+        shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
       }
 
       if (!shellScriptPath.isEmpty()
@@ -351,7 +351,7 @@ public class ApplicationMaster {
         LOG.error("Illegal values in env for shell script path"
             + ", path=" + shellScriptPath
             + ", len=" + shellScriptPathLen
-            + ", timestamp=" + shellScriptPathTimestamp);	
+            + ", timestamp=" + shellScriptPathTimestamp);
         throw new IllegalArgumentException("Illegal values in env for shell script path");
       }
     }
@@ -368,7 +368,7 @@ public class ApplicationMaster {
    * @param opts Parsed command line options
    */
   private void printUsage(Options opts) {
-    new HelpFormatter().printHelp("ApplicationMaster", opts);		
+    new HelpFormatter().printHelp("ApplicationMaster", opts);
   }
 
   /**
@@ -378,7 +378,7 @@ public class ApplicationMaster {
   public boolean run() throws YarnRemoteException {
     LOG.info("Starting ApplicationMaster");
 
-    // Connect to ResourceManager 	
+    // Connect to ResourceManager
     resourceManager = connectToRM();
 
     // Setup local RPC Server to accept status requests directly from clients 
@@ -395,7 +395,7 @@ public class ApplicationMaster {
 
     // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
     // a multiple of the min value and cannot exceed the max. 
-    // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min		
+    // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
     if (containerMemory < minMem) {
       LOG.info("Container memory specified below min threshold of cluster. Using min value."
           + ", specified=" + containerMemory
@@ -409,14 +409,14 @@ public class ApplicationMaster {
       containerMemory = maxMem;
     }
 
-    // Setup heartbeat emitter 		
+    // Setup heartbeat emitter
     // TODO poll RM every now and then with an empty request to let RM know that we are alive
     // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: 
     // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
     // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter 
     // is not required.
 
-    // Setup ask for containers from RM		    
+    // Setup ask for containers from RM
     // Send request for containers to RM
     // Until we get our fully allocated quota, we keep on polling RM for containers
     // Keep looping until all the containers are launched and shell script executed on them 
@@ -426,7 +426,7 @@ public class ApplicationMaster {
 
     while (numCompletedContainers.get() < numTotalContainers
         && !appDone) {
-      loopCounter++;		
+      loopCounter++;
 
       // log current state
       LOG.info("Current application state: loop=" + loopCounter 
@@ -435,7 +435,7 @@ public class ApplicationMaster {
           + ", requested=" + numRequestedContainers
           + ", completed=" + numCompletedContainers
           + ", failed=" + numFailedContainers
-          + ", currentAllocated=" + numAllocatedContainers);			
+          + ", currentAllocated=" + numAllocatedContainers);
 
       // Sleep before each loop when asking RM for containers
       // to avoid flooding RM with spurious requests when it 
@@ -444,7 +444,7 @@ public class ApplicationMaster {
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
-        LOG.info("Sleep interrupted " + e.getMessage());				
+        LOG.info("Sleep interrupted " + e.getMessage());
       }
 
       // No. of containers to request 
@@ -457,14 +457,14 @@ public class ApplicationMaster {
       // Setup request to be sent to RM to allocate containers
       List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
       if (askCount > 0) {
-        ResourceRequest containerAsk = setupContainerAskForRM(askCount);			
+        ResourceRequest containerAsk = setupContainerAskForRM(askCount);
         resourceReq.add(containerAsk);
       }
 
       // Send the request to RM 
       LOG.info("Asking RM for containers"
           + ", askCount=" + askCount);
-      AMResponse amResp =	sendContainerAskToRM(resourceReq);			
+      AMResponse amResp =sendContainerAskToRM(resourceReq);
 
       // Retrieve list of allocated containers from the response 
       List<Container> allocatedContainers = amResp.getAllocatedContainers();
@@ -478,10 +478,10 @@ public class ApplicationMaster {
             + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
             + ", containerState" + allocatedContainer.getState()
             + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
-        //						+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
+        //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
 
         LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
-        Thread launchThread = new Thread(runnableLaunchContainer);	
+        Thread launchThread = new Thread(runnableLaunchContainer);
 
         // launch and start the container on a separate thread to keep the main thread unblocked
         // as all containers may not be allocated at one go.
@@ -492,14 +492,14 @@ public class ApplicationMaster {
       // Check what the current available resources in the cluster are
       // TODO should we do anything if the available resources are not enough? 
       Resource availableResources = amResp.getAvailableResources();
-      LOG.info("Current available resources in the cluster " + availableResources);			
+      LOG.info("Current available resources in the cluster " + availableResources);
 
-      // Check the completed containers 			
+      // Check the completed containers
       List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
       LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
-      for (ContainerStatus containerStatus : completedContainers) {				
+      for (ContainerStatus containerStatus : completedContainers) {
         LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
-            + ", state=" + containerStatus.getState()	
+            + ", state=" + containerStatus.getState()
             + ", exitStatus=" + containerStatus.getExitStatus() 
             + ", diagnostics=" + containerStatus.getDiagnostics());
 
@@ -514,7 +514,7 @@ public class ApplicationMaster {
             // shell script failed
             // counts as completed 
             numCompletedContainers.incrementAndGet();
-            numFailedContainers.incrementAndGet();							
+            numFailedContainers.incrementAndGet();
           }
           else { 
             // something else bad happened 
@@ -541,15 +541,15 @@ public class ApplicationMaster {
 
       LOG.info("Current application state: loop=" + loopCounter
           + ", appDone=" + appDone
-          + ", total=" + numTotalContainers					
+          + ", total=" + numTotalContainers
           + ", requested=" + numRequestedContainers
           + ", completed=" + numCompletedContainers
           + ", failed=" + numFailedContainers
-          + ", currentAllocated=" + numAllocatedContainers);	
+          + ", currentAllocated=" + numAllocatedContainers);
 
       // TODO 
       // Add a timeout handling layer 
-      // for misbehaving shell commands			
+      // for misbehaving shell commands
     }
 
     // Join all launched threads
@@ -561,7 +561,7 @@ public class ApplicationMaster {
       } catch (InterruptedException e) {
         LOG.info("Exception thrown in thread join: " + e.getMessage());
         e.printStackTrace();
-      }			
+      }
     }
 
     // When the application completes, it should send a finish application signal 
@@ -610,10 +610,11 @@ public class ApplicationMaster {
      * Helper function to connect to CM
      */
     private void connectToCM() {
-      String cmIpPortStr = container.getNodeId().getHost() + ":" 
-          + container.getNodeId().getPort();		
-      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);		
-      LOG.info("Connecting to ResourceManager at " + cmIpPortStr);
+      LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
+      String cmIpPortStr = container.getNodeId().getHost() + ":"
+          + container.getNodeId().getPort();
+      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+      LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
       this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
     }
 
@@ -626,7 +627,6 @@ public class ApplicationMaster {
      */
     public void run() {
       // Connect to ContainerManager 
-      LOG.info("Connecting to container manager for containerid=" + container.getId());
       connectToCM();
 
       LOG.info("Setting up container launch container for containerid=" + container.getId());
@@ -654,7 +654,7 @@ public class ApplicationMaster {
       if (!shellScriptPath.isEmpty()) {
         LocalResource shellRsrc = Records.newRecord(LocalResource.class);
         shellRsrc.setType(LocalResourceType.FILE);
-        shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
+        shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
         try {
           shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
         } catch (URISyntaxException e) {
@@ -664,17 +664,17 @@ public class ApplicationMaster {
 
           // A failure scenario on bad input such as invalid shell script path 
           // We know we cannot continue launching the container 
-          // so we should release it. 															 					
+          // so we should release it.
           // TODO
           numCompletedContainers.incrementAndGet();
           numFailedContainers.incrementAndGet();
-          return;					
+          return;
         }
         shellRsrc.setTimestamp(shellScriptPathTimestamp);
         shellRsrc.setSize(shellScriptPathLen);
         localResources.put(ExecShellStringPath, shellRsrc);
-      }			
-      ctx.setLocalResources(localResources);			
+      }
+      ctx.setLocalResources(localResources);
 
       // Set the necessary command to execute on the allocated container 
       Vector<CharSequence> vargs = new Vector<CharSequence>(5);
@@ -686,7 +686,7 @@ public class ApplicationMaster {
         vargs.add(ExecShellStringPath);
       }
 
-      // Set args for the shell command if any			
+      // Set args for the shell command if any
       vargs.add(shellArgs);
       // Add log redirect params
       // TODO
@@ -722,19 +722,19 @@ public class ApplicationMaster {
       // Left commented out as the shell scripts are short lived 
       // and we are relying on the status for completed containers from RM to detect status
 
-      //		    GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
-      //		    statusReq.setContainerId(container.getId());
-      //		    GetContainerStatusResponse statusResp;
-      //			try {
-      //				statusResp = cm.getContainerStatus(statusReq);
-      //			    LOG.info("Container Status"
-      //			    		+ ", id=" + container.getId()
-      //			    		+ ", status=" +statusResp.getStatus());
-      //			} catch (YarnRemoteException e) {
-      //				e.printStackTrace();
-      //			}
-    }		
-  }	
+      //    GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
+      //    statusReq.setContainerId(container.getId());
+      //    GetContainerStatusResponse statusResp;
+      //try {
+      //statusResp = cm.getContainerStatus(statusReq);
+      //    LOG.info("Container Status"
+      //    + ", id=" + container.getId()
+      //    + ", status=" +statusResp.getStatus());
+      //} catch (YarnRemoteException e) {
+      //e.printStackTrace();
+      //}
+    }
+  }
 
   /**
    * Connect to the Resource Manager
@@ -744,25 +744,25 @@ public class ApplicationMaster {
     YarnConfiguration yarnConf = new YarnConfiguration(conf);
     InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
         YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));		
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
     LOG.info("Connecting to ResourceManager at " + rmAddress);
     return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
-  }		
+  }
 
   /** 
    * Register the Application Master to the Resource Manager
    * @return the registration response from the RM
    * @throws YarnRemoteException
    */
-  private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {		
-    RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);	
+  private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
+    RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
 
     // set the required info into the registration request: 
     // application attempt id, 
     // host on which the app master is running
     // rpc port on which the app master accepts requests from the client 
     // tracking url for the app master
-    appMasterRequest.setApplicationAttemptId(appAttemptID);	
+    appMasterRequest.setApplicationAttemptId(appAttemptID);
     appMasterRequest.setHost(appMasterHostname);
     appMasterRequest.setRpcPort(appMasterRpcPort);
     appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
@@ -792,7 +792,7 @@ public class ApplicationMaster {
     Priority pri = Records.newRecord(Priority.class);
     // TODO - what is the range for priority? how to decide? 
     pri.setPriority(requestPriority);
-    request.setPriority(pri);	    
+    request.setPriority(pri);
 
     // Set up resource type requirements
     // For now, only memory is supported so we set memory requirements
@@ -810,7 +810,7 @@ public class ApplicationMaster {
    * @throws YarnRemoteException
    */
   private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
-      throws YarnRemoteException {	
+      throws YarnRemoteException {
     AllocateRequest req = Records.newRecord(AllocateRequest.class);
     req.setResponseId(rmRequestID.incrementAndGet());
     req.setApplicationAttemptId(appAttemptID);
@@ -830,7 +830,7 @@ public class ApplicationMaster {
       LOG.info("Released container, id=" + id.getId());
     }
 
-    AllocateResponse resp = resourceManager.allocate(req);		
-    return resp.getAMResponse();		    
+    AllocateResponse resp = resourceManager.allocate(req);
+    return resp.getAMResponse();
   }
 }