You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/02/10 02:49:30 UTC

svn commit: r1242635 [2/10] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Feb 10 01:49:08 2012
@@ -19,19 +19,24 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -127,7 +132,7 @@ import org.apache.hadoop.yarn.util.RackR
 /**
  * Implementation of TaskAttempt interface.
  */
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings({ "rawtypes" })
 public abstract class TaskAttemptImpl implements
     org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
       EventHandler<TaskAttemptEvent> {
@@ -142,7 +147,7 @@ public abstract class TaskAttemptImpl im
   protected final JobConf conf;
   protected final Path jobFile;
   protected final int partition;
-  protected final EventHandler eventHandler;
+  protected EventHandler eventHandler;
   private final TaskAttemptId attemptId;
   private final Clock clock;
   private final org.apache.hadoop.mapred.JobID oldJobId;
@@ -910,8 +915,10 @@ public abstract class TaskAttemptImpl im
   @SuppressWarnings("unchecked")
   @Override
   public void handle(TaskAttemptEvent event) {
-    LOG.info("Processing " + event.getTaskAttemptID() +
-        " of type " + event.getType());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskAttemptID() + " of type "
+          + event.getType());
+    }
     writeLock.lock();
     try {
       final TaskAttemptState oldState = getState();
@@ -1054,7 +1061,7 @@ public abstract class TaskAttemptImpl im
     }
   }
 
-  private static class RequestContainerTransition implements
+  static class RequestContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     private final boolean rescheduled;
     public RequestContainerTransition(boolean rescheduled) {
@@ -1074,19 +1081,49 @@ public abstract class TaskAttemptImpl im
                 taskAttempt.attemptId, 
                 taskAttempt.resourceCapability));
       } else {
-        int i = 0;
-        String[] racks = new String[taskAttempt.dataLocalHosts.length];
+        Set<String> racks = new HashSet<String>(); 
         for (String host : taskAttempt.dataLocalHosts) {
-          racks[i++] = RackResolver.resolve(host).getNetworkLocation();
+          racks.add(RackResolver.resolve(host).getNetworkLocation());
         }
-        taskAttempt.eventHandler.handle(
-            new ContainerRequestEvent(taskAttempt.attemptId, 
-                taskAttempt.resourceCapability, 
-                taskAttempt.dataLocalHosts, racks));
+        taskAttempt.eventHandler.handle(new ContainerRequestEvent(
+            taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
+                .resolveHosts(taskAttempt.dataLocalHosts), racks
+                .toArray(new String[racks.size()])));
       }
     }
   }
 
+  protected String[] resolveHosts(String[] src) {
+    String[] result = new String[src.length];
+    for (int i = 0; i < src.length; i++) {
+      if (isIP(src[i])) {
+        result[i] = resolveHost(src[i]);
+      } else {
+        result[i] = src[i];
+      }
+    }
+    return result;
+  }
+
+  protected String resolveHost(String src) {
+    String result = src; // Fallback in case of failure.
+    try {
+      InetAddress addr = InetAddress.getByName(src);
+      result = addr.getHostName();
+    } catch (UnknownHostException e) {
+      LOG.warn("Failed to resolve address: " + src
+          + ". Continuing to use the same.");
+    }
+    return result;
+  }
+
+  private static final Pattern ipPattern = // Pattern for matching ip
+    Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}");
+  
+  protected boolean isIP(String src) {
+    return ipPattern.matcher(src).matches();
+  }
+
   private static class ContainerAssignedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings({ "unchecked" })
@@ -1278,15 +1315,11 @@ public abstract class TaskAttemptImpl im
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
-      String taskType = 
-          TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
-      LOG.info("In TaskAttemptImpl taskType: " + taskType);
       long slotMillis = computeSlotMillis(taskAttempt);
-      JobCounterUpdateEvent jce =
-          new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
-              .getJobId());
+      TaskId taskId = taskAttempt.attemptId.getTaskId();
+      JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
       jce.addCounterUpdate(
-        taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? 
+        taskId.getTaskType() == TaskType.MAP ? 
           JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
           slotMillis);
       taskAttempt.eventHandler.handle(jce);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Feb 10 01:49:08 2012
@@ -81,7 +81,7 @@ import org.apache.hadoop.yarn.state.Stat
 /**
  * Implementation of Task interface.
  */
-@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
@@ -505,7 +505,9 @@ public abstract class TaskImpl implement
   // This is always called in the Write Lock
   private void addAndScheduleAttempt() {
     TaskAttempt attempt = createAttempt();
-    LOG.info("Created attempt " + attempt.getID());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created attempt " + attempt.getID());
+    }
     switch (attempts.size()) {
       case 0:
         attempts = Collections.singletonMap(attempt.getID(), attempt);
@@ -537,7 +539,10 @@ public abstract class TaskImpl implement
 
   @Override
   public void handle(TaskEvent event) {
-    LOG.debug("Processing " + event.getTaskID() + " of type " + event.getType());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskID() + " of type "
+          + event.getType());
+    }
     try {
       writeLock.lock();
       TaskState oldState = getState();
@@ -559,6 +564,7 @@ public abstract class TaskImpl implement
   }
 
   private void internalError(TaskEventType type) {
+    LOG.error("Invalid event " + type + " on Task " + this.taskId);
     eventHandler.handle(new JobDiagnosticsUpdateEvent(
         this.taskId.getJobId(), "Invalid event " + type + 
         " on Task " + this.taskId));

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Feb 10 01:49:08 2012
@@ -103,6 +103,7 @@ public class LocalContainerAllocator ext
       // This can happen when the connection to the RM has gone down. Keep
       // re-trying until the retryInterval has expired.
       if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
         eventHandler.handle(new JobEvent(this.getJob().getID(),
                                          JobEventType.INTERNAL_ERROR));
         throw new YarnException("Could not contact RM after " +

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Fri Feb 10 01:49:08 2012
@@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -191,6 +193,11 @@ public class RecoveryService extends Com
     in = fc.open(historyFile);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    if (parseException != null) {
+      LOG.info("Got an error parsing job-history file " + historyFile + 
+          ", ignoring incomplete events.", parseException);
+    }
     Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
         .getAllTasks();
     for (TaskInfo taskInfo : taskInfos.values()) {
@@ -353,16 +360,24 @@ public class RecoveryService extends Com
           //recover the task output
           TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
               attInfo.getAttemptId());
-          try {
-            committer.recoverTask(taskContext);
+          try { 
+            TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+            int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 
+            if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+              committer.recoverTask(taskContext);
+              LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+            } else {
+              LOG.info("Will not try to recover output for "
+                  + taskContext.getTaskAttemptID());
+            }
           } catch (IOException e) {
+            LOG.error("Caught an exception while trying to recover task "+aId, e);
             actualHandler.handle(new JobDiagnosticsUpdateEvent(
                 aId.getTaskId().getJobId(), "Error in recovering task output " + 
                 e.getMessage()));
             actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
                 JobEventType.INTERNAL_ERROR));
           }
-          LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
           
           // send the done event
           LOG.info("Sending done event to " + aId);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Feb 10 01:49:08 2012
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -46,9 +45,9 @@ import org.apache.hadoop.yarn.api.Applic
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -149,7 +148,7 @@ public abstract class RMCommunicator ext
       LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
       LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
     } catch (Exception are) {
-      LOG.info("Exception while registering", are);
+      LOG.error("Exception while registering", are);
       throw new YarnException(are);
     }
   }
@@ -183,7 +182,7 @@ public abstract class RMCommunicator ext
       request.setTrackingUrl(historyUrl);
       scheduler.finishApplicationMaster(request);
     } catch(Exception are) {
-      LOG.info("Exception while unregistering ", are);
+      LOG.error("Exception while unregistering ", are);
     }
   }
 
@@ -205,7 +204,7 @@ public abstract class RMCommunicator ext
     try {
       allocatorThread.join();
     } catch (InterruptedException ie) {
-      LOG.info("InterruptedException while stopping", ie);
+      LOG.warn("InterruptedException while stopping", ie);
     }
     unregister();
     super.stop();
@@ -228,7 +227,7 @@ public abstract class RMCommunicator ext
               // TODO: for other exceptions
             }
           } catch (InterruptedException e) {
-            LOG.info("Allocated thread interrupted. Returning.");
+            LOG.warn("Allocated thread interrupted. Returning.");
             return;
           }
         }
@@ -255,7 +254,9 @@ public abstract class RMCommunicator ext
     if (UserGroupInformation.isSecurityEnabled()) {
       String tokenURLEncodedStr = System.getenv().get(
           ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+      }
       Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
 
       try {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Feb 10 01:49:08 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -254,28 +255,30 @@ public class RMContainerAllocator extend
 
   @SuppressWarnings({ "unchecked" })
   protected synchronized void handleEvent(ContainerAllocatorEvent event) {
-    LOG.info("Processing the event " + event.toString());
     recalculateReduceSchedule = true;
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
       ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
+      JobId jobId = getJob().getID();
+      int supportedMaxContainerCapability =
+          getMaxContainerCapability().getMemory();
       if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
         if (mapResourceReqt == 0) {
           mapResourceReqt = reqEvent.getCapability().getMemory();
           int minSlotMemSize = getMinContainerCapability().getMemory();
           mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
               * minSlotMemSize;
-          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
+          eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
               mapResourceReqt)));
           LOG.info("mapResourceReqt:"+mapResourceReqt);
-          if (mapResourceReqt > getMaxContainerCapability().getMemory()) {
+          if (mapResourceReqt > supportedMaxContainerCapability) {
             String diagMsg = "MAP capability required is more than the supported " +
             "max container capability in the cluster. Killing the Job. mapResourceReqt: " + 
-            mapResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory();
+            mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
-                getJob().getID(), diagMsg));
-            eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
+                jobId, diagMsg));
+            eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
           }
         }
         //set the rounded off memory
@@ -288,20 +291,20 @@ public class RMContainerAllocator extend
           //round off on slotsize
           reduceResourceReqt = (int) Math.ceil((float) 
               reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
-          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
+          eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(
                   org.apache.hadoop.mapreduce.TaskType.REDUCE,
               reduceResourceReqt)));
           LOG.info("reduceResourceReqt:"+reduceResourceReqt);
-          if (reduceResourceReqt > getMaxContainerCapability().getMemory()) {
+          if (reduceResourceReqt > supportedMaxContainerCapability) {
             String diagMsg = "REDUCE capability required is more than the " +
             		"supported max container capability in the cluster. Killing the " +
             		"Job. reduceResourceReqt: " + reduceResourceReqt +
-            		" maxContainerCapability:" + getMaxContainerCapability().getMemory();
+            		" maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
-                getJob().getID(), diagMsg));
-            eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
+                jobId, diagMsg));
+            eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
           }
         }
         //set the rounded off memory
@@ -317,6 +320,9 @@ public class RMContainerAllocator extend
       
     } else if (
         event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+  
+      LOG.info("Processing the event " + event.toString());
+
       TaskAttemptId aId = event.getAttemptID();
       
       boolean removed = scheduledRequests.remove(aId);
@@ -543,6 +549,7 @@ public class RMContainerAllocator extend
       // This can happen when the connection to the RM has gone down. Keep
       // re-trying until the retryInterval has expired.
       if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
         eventHandler.handle(new JobEvent(this.getJob().getID(),
                                          JobEventType.INTERNAL_ERROR));
         throw new YarnException("Could not contact RM after " +
@@ -578,7 +585,7 @@ public class RMContainerAllocator extend
     computeIgnoreBlacklisting();
     
     for (ContainerStatus cont : finishedContainers) {
-      LOG.info("Received completed container " + cont);
+      LOG.info("Received completed container " + cont.getContainerId());
       TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
       if (attemptID == null) {
         LOG.error("Container complete event for unknown container id "
@@ -663,7 +670,9 @@ public class RMContainerAllocator extend
             mapsHostMapping.put(host, list);
           }
           list.add(event.getAttemptID());
-          LOG.info("Added attempt req to host " + host);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Added attempt req to host " + host);
+          }
        }
        for (String rack: event.getRacks()) {
          LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
@@ -672,7 +681,9 @@ public class RMContainerAllocator extend
            mapsRackMapping.put(rack, list);
          }
          list.add(event.getAttemptID());
-         LOG.info("Added attempt req to rack " + rack);
+         if (LOG.isDebugEnabled()) {
+            LOG.debug("Added attempt req to rack " + rack);
+         }
        }
        request = new ContainerRequest(event, PRIORITY_MAP);
       }
@@ -693,18 +704,21 @@ public class RMContainerAllocator extend
       containersAllocated += allocatedContainers.size();
       while (it.hasNext()) {
         Container allocated = it.next();
-        LOG.info("Assigning container " + allocated.getId() +
-            " with priority " + allocated.getPriority() +
-            " to NM " + allocated.getNodeId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Assigning container " + allocated.getId()
+              + " with priority " + allocated.getPriority() + " to NM "
+              + allocated.getNodeId());
+        }
         
         // check if allocated container meets memory requirements 
         // and whether we have any scheduled tasks that need 
         // a container to be assigned
         boolean isAssignable = true;
         Priority priority = allocated.getPriority();
+        int allocatedMemory = allocated.getResource().getMemory();
         if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
             || PRIORITY_MAP.equals(priority)) {
-          if (allocated.getResource().getMemory() < mapResourceReqt
+          if (allocatedMemory < mapResourceReqt
               || maps.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a map as either "
@@ -715,7 +729,7 @@ public class RMContainerAllocator extend
           }
         } 
         else if (PRIORITY_REDUCE.equals(priority)) {
-          if (allocated.getResource().getMemory() < reduceResourceReqt
+          if (allocatedMemory < reduceResourceReqt
               || reduces.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a reduce as either "
@@ -729,15 +743,17 @@ public class RMContainerAllocator extend
         boolean blackListed = false;         
         ContainerRequest assigned = null;
         
+        ContainerId allocatedContainerId = allocated.getId();
         if (isAssignable) {
           // do not assign if allocated container is on a  
           // blacklisted host
-          blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
+          String allocatedHost = allocated.getNodeId().getHost();
+          blackListed = isNodeBlacklisted(allocatedHost);
           if (blackListed) {
             // we need to request for a new container 
             // and release the current one
             LOG.info("Got allocated container on a blacklisted "
-                + " host "+allocated.getNodeId().getHost()
+                + " host "+allocatedHost
                 +". Releasing container " + allocated);
 
             // find the request matching this allocated container 
@@ -774,11 +790,13 @@ public class RMContainerAllocator extend
               eventHandler.handle(new TaskAttemptContainerAssignedEvent(
                   assigned.attemptID, allocated, applicationACLs));
 
-              assignedRequests.add(allocated.getId(), assigned.attemptID);
+              assignedRequests.add(allocatedContainerId, assigned.attemptID);
 
-              LOG.info("Assigned container (" + allocated + ") " +
-                  " to task " + assigned.attemptID +
-                  " on node " + allocated.getNodeId().toString());
+              if (LOG.isDebugEnabled()) {
+                LOG.info("Assigned container (" + allocated + ") "
+                    + " to task " + assigned.attemptID + " on node "
+                    + allocated.getNodeId().toString());
+              }
             }
             else {
               //not assigned to any request, release the container
@@ -793,7 +811,7 @@ public class RMContainerAllocator extend
         // or if we could not assign it 
         if (blackListed || assigned == null) {
           containersReleased++;
-          release(allocated.getId());
+          release(allocatedContainerId);
         }
       }
     }
@@ -806,10 +824,14 @@ public class RMContainerAllocator extend
         LOG.info("Assigning container " + allocated + " to fast fail map");
         assigned = assignToFailedMap(allocated);
       } else if (PRIORITY_REDUCE.equals(priority)) {
-        LOG.info("Assigning container " + allocated + " to reduce");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Assigning container " + allocated + " to reduce");
+        }
         assigned = assignToReduce(allocated);
       } else if (PRIORITY_MAP.equals(priority)) {
-        LOG.info("Assigning container " + allocated + " to map");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Assigning container " + allocated + " to map");
+        }
         assigned = assignToMap(allocated);
       } else {
         LOG.warn("Container allocated at unwanted priority: " + priority + 
@@ -896,7 +918,9 @@ public class RMContainerAllocator extend
         String host = allocated.getNodeId().getHost();
         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
         while (list != null && list.size() > 0) {
-          LOG.info("Host matched to the request list " + host);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Host matched to the request list " + host);
+          }
           TaskAttemptId tId = list.removeFirst();
           if (maps.containsKey(tId)) {
             assigned = maps.remove(tId);
@@ -905,7 +929,9 @@ public class RMContainerAllocator extend
             jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
             eventHandler.handle(jce);
             hostLocalAssigned++;
-            LOG.info("Assigned based on host match " + host);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Assigned based on host match " + host);
+            }
             break;
           }
         }
@@ -921,7 +947,9 @@ public class RMContainerAllocator extend
               jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
               eventHandler.handle(jce);
               rackLocalAssigned++;
-              LOG.info("Assigned based on rack match " + rack);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Assigned based on rack match " + rack);
+              }
               break;
             }
           }
@@ -932,7 +960,9 @@ public class RMContainerAllocator extend
               new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
             jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
             eventHandler.handle(jce);
-            LOG.info("Assigned based on * match");
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Assigned based on * match");
+            }
             break;
           }
         }
@@ -952,8 +982,7 @@ public class RMContainerAllocator extend
       new HashSet<TaskAttemptId>();
     
     void add(ContainerId containerId, TaskAttemptId tId) {
-      LOG.info("Assigned container " + containerId.toString()
-          + " to " + tId);
+      LOG.info("Assigned container " + containerId.toString() + " to " + tId);
       containerToAttemptMap.put(containerId, tId);
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
         maps.put(tId, containerId);
@@ -962,6 +991,7 @@ public class RMContainerAllocator extend
       }
     }
 
+    @SuppressWarnings("unchecked")
     void preemptReduce(int toPreempt) {
       List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
         (reduces.keySet());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri Feb 10 01:49:08 2012
@@ -155,13 +155,14 @@ public abstract class RMContainerRequest
     lastClusterNmCount = clusterNmCount;
     clusterNmCount = allocateResponse.getNumClusterNodes();
 
-    LOG.info("getResources() for " + applicationId + ":" + " ask="
-        + ask.size() + " release= " + release.size() + 
-        " newContainers=" + response.getAllocatedContainers().size() + 
-        " finishedContainers=" + 
-        response.getCompletedContainersStatuses().size() + 
-        " resourcelimit=" + availableResources + 
-        " knownNMs=" + clusterNmCount);
+    if (ask.size() > 0 || release.size() > 0) {
+      LOG.info("getResources() for " + applicationId + ":" + " ask="
+          + ask.size() + " release= " + release.size() + " newContainers="
+          + response.getAllocatedContainers().size() + " finishedContainers="
+          + response.getCompletedContainersStatuses().size()
+          + " resourcelimit=" + availableResources + " knownNMs="
+          + clusterNmCount);
+    }
 
     ask.clear();
     release.clear();
@@ -172,6 +173,9 @@ public abstract class RMContainerRequest
   // knownNodeCount is based on node managers, not hosts. blacklisting is
   // currently based on hosts.
   protected void computeIgnoreBlacklisting() {
+    if (!nodeBlacklistingEnabled) {
+      return;
+    }
     if (blacklistDisablePercent != -1
         && (blacklistedNodeCount != blacklistedNodes.size() ||
             clusterNmCount != lastClusterNmCount)) {
@@ -200,7 +204,9 @@ public abstract class RMContainerRequest
       return;
     }
     if (blacklistedNodes.contains(hostName)) {
-      LOG.info("Host " + hostName + " is already blacklisted.");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Host " + hostName + " is already blacklisted.");
+      }
       return; //already blacklisted
     }
     Integer failures = nodeFailures.remove(hostName);
@@ -293,7 +299,9 @@ public abstract class RMContainerRequest
     if (remoteRequests == null) {
       remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
       this.remoteRequestsTable.put(priority, remoteRequests);
-      LOG.info("Added priority=" + priority);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
     }
     Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
     if (reqMap == null) {
@@ -313,10 +321,12 @@ public abstract class RMContainerRequest
 
     // Note this down for next interaction with ResourceManager
     ask.add(remoteRequest);
-    LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
-        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
-        + ask.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+          + applicationId.getId() + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
   }
 
   private void decResourceRequest(Priority priority, String resourceName,
@@ -328,16 +338,20 @@ public abstract class RMContainerRequest
       // as we modify the resource requests by filtering out blacklisted hosts 
       // when they are added, this value may be null when being 
       // decremented
-      LOG.debug("Not decrementing resource as " + resourceName
-          + " is not present in request table");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+            + " is not present in request table");
+      }
       return;
     }
     ResourceRequest remoteRequest = reqMap.get(capability);
 
-    LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
-        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
-        + ask.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + applicationId.getId() + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
 
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
     if (remoteRequest.getNumContainers() == 0) {
@@ -355,10 +369,12 @@ public abstract class RMContainerRequest
       //already have it.
     }
 
-    LOG.info("AFTER decResourceRequest:" + " applicationId="
-             + applicationId.getId() + " priority=" + priority.getPriority()
-             + " resourceName=" + resourceName + " numContainers="
-             + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+          + applicationId.getId() + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
   }
 
   protected void release(ContainerId containerId) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java Fri Feb 10 01:49:08 2012
@@ -95,7 +95,13 @@ public class AppController extends Contr
    * Render the /job page
    */
   public void job() {
-    requireJob();
+    try {
+      requireJob();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     render(jobPage());
   }
 
@@ -110,7 +116,13 @@ public class AppController extends Contr
    * Render the /jobcounters page
    */
   public void jobCounters() {
-    requireJob();
+    try {
+      requireJob();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     if (app.getJob() != null) {
       setTitle(join("Counters for ", $(JOB_ID)));
     }
@@ -121,7 +133,13 @@ public class AppController extends Contr
    * Display a page showing a task's counters
    */
   public void taskCounters() {
-    requireTask();
+    try {
+      requireTask();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     if (app.getTask() != null) {
       setTitle(StringHelper.join("Counters for ", $(TASK_ID)));
     }
@@ -140,7 +158,13 @@ public class AppController extends Contr
    * @throws IOException on any error.
    */
   public void singleJobCounter() throws IOException{
-    requireJob();
+    try {
+      requireJob();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
     set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
     if (app.getJob() != null) {
@@ -155,7 +179,13 @@ public class AppController extends Contr
    * @throws IOException on any error.
    */
   public void singleTaskCounter() throws IOException{
-    requireTask();
+    try {
+      requireTask();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
     set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
     if (app.getTask() != null) {
@@ -176,7 +206,13 @@ public class AppController extends Contr
    * Render the /tasks page
    */
   public void tasks() {
-    requireJob();
+    try {
+      requireJob();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     if (app.getJob() != null) {
       try {
         String tt = $(TASK_TYPE);
@@ -201,7 +237,13 @@ public class AppController extends Contr
    * Render the /task page
    */
   public void task() {
-    requireTask();
+    try {
+      requireTask();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     if (app.getTask() != null) {
       setTitle(join("Attempts for ", $(TASK_ID)));
     }
@@ -219,7 +261,13 @@ public class AppController extends Contr
    * Render the attempts page
    */
   public void attempts() {
-    requireJob();
+    try {
+      requireJob();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     if (app.getJob() != null) {
       try {
         String taskType = $(TASK_TYPE);
@@ -252,6 +300,13 @@ public class AppController extends Contr
    */
   public void conf() {
     requireJob();
+    try {
+      requireJob();
+    }
+    catch (Exception e) {
+      renderText(e.getMessage());
+      return;
+    }
     render(confPage());
   }
 
@@ -280,41 +335,43 @@ public class AppController extends Contr
   void accessDenied(String s) {
     setStatus(HttpServletResponse.SC_FORBIDDEN);
     setTitle(join("Access denied: ", s));
-    throw new RuntimeException("Access denied: " + s);
   }
 
   /**
    * check for job access.
    * @param job the job that is being accessed
+   * @return True if the requesting user has permission to view the job
    */
-  void checkAccess(Job job) {
+  boolean checkAccess(Job job) {
     UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser(
         request().getRemoteUser());
-    if (!job.checkAccess(callerUgi, JobACL.VIEW_JOB)) {
-      accessDenied("User " + request().getRemoteUser() + " does not have " +
-          " permissions.");
-    }
+    return job.checkAccess(callerUgi, JobACL.VIEW_JOB);
   }
 
   /**
    * Ensure that a JOB_ID was passed into the page.
    */
   public void requireJob() {
-    try {
-      if ($(JOB_ID).isEmpty()) {
-        throw new RuntimeException("missing job ID");
-      }
-      JobId jobID = MRApps.toJobID($(JOB_ID));
-      app.setJob(app.context.getJob(jobID));
-      if (app.getJob() == null) {
-        notFound($(JOB_ID));
-      }
-      /* check for acl access */
-      Job job = app.context.getJob(jobID);
-      checkAccess(job);
-    } catch (Exception e) {
-      badRequest(e.getMessage() == null ? 
-          e.getClass().getName() : e.getMessage());
+    if ($(JOB_ID).isEmpty()) {
+      badRequest("missing job ID");
+      throw new RuntimeException("Bad Request: Missing job ID");
+    }
+
+    JobId jobID = MRApps.toJobID($(JOB_ID));
+    app.setJob(app.context.getJob(jobID));
+    if (app.getJob() == null) {
+      notFound($(JOB_ID));
+      throw new RuntimeException("Not Found: " + $(JOB_ID));
+    }
+
+    /* check for acl access */
+    Job job = app.context.getJob(jobID);
+    if (!checkAccess(job)) {
+      accessDenied("User " + request().getRemoteUser() + " does not have " +
+          " permission to view job " + $(JOB_ID));
+      throw new RuntimeException("Access denied: User " +
+          request().getRemoteUser() + " does not have permission to view job " +
+          $(JOB_ID));
     }
   }
 
@@ -322,24 +379,30 @@ public class AppController extends Contr
    * Ensure that a TASK_ID was passed into the page.
    */
   public void requireTask() {
-    try {
-      if ($(TASK_ID).isEmpty()) {
-        throw new RuntimeException("missing task ID");
-      }
-      TaskId taskID = MRApps.toTaskID($(TASK_ID));
-      Job job = app.context.getJob(taskID.getJobId());
-      app.setJob(job);
-      if (app.getJob() == null) {
-        notFound(MRApps.toString(taskID.getJobId()));
-      } else {
-        app.setTask(app.getJob().getTask(taskID));
-        if (app.getTask() == null) {
-          notFound($(TASK_ID));
-        }
+    if ($(TASK_ID).isEmpty()) {
+      badRequest("missing task ID");
+      throw new RuntimeException("missing task ID");
+    }
+
+    TaskId taskID = MRApps.toTaskID($(TASK_ID));
+    Job job = app.context.getJob(taskID.getJobId());
+    app.setJob(job);
+    if (app.getJob() == null) {
+      notFound(MRApps.toString(taskID.getJobId()));
+      throw new RuntimeException("Not Found: " + $(JOB_ID));
+    } else {
+      app.setTask(app.getJob().getTask(taskID));
+      if (app.getTask() == null) {
+        notFound($(TASK_ID));
+        throw new RuntimeException("Not Found: " + $(TASK_ID));
       }
-      checkAccess(job);
-    } catch (Exception e) {
-      badRequest(e.getMessage());
+    }
+    if (!checkAccess(job)) {
+      accessDenied("User " + request().getRemoteUser() + " does not have " +
+          " permission to view job " + $(JOB_ID));
+      throw new RuntimeException("Access denied: User " +
+          request().getRemoteUser() + " does not have permission to view job " +
+          $(JOB_ID));
     }
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppView.java Fri Feb 10 01:49:08 2012
@@ -33,7 +33,6 @@ public class AppView extends TwoColumnLa
   }
 
   protected void commonPreHead(Page.HTML<_> html) {
-    html.meta_http("refresh", "10");
     set(ACCORDION_ID, "nav");
     set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
     set(THEMESWITCHER_ID, "themeswitcher");

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java Fri Feb 10 01:49:08 2012
@@ -27,6 +27,10 @@ public class CountersPage extends AppVie
 
   @Override protected void preHead(Page.HTML<_> html) {
     commonPreHead(html);
+
+    // Counters page is a summary. Helps to refresh automatically.
+    html.meta_http("refresh", "10");
+
     String tid = $(TASK_ID);
     String activeNav = "3";
     if(tid == null || tid.isEmpty()) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java Fri Feb 10 01:49:08 2012
@@ -32,6 +32,10 @@ public class JobPage extends AppView {
     set(TITLE, jobID.isEmpty() ? "Bad request: missing job ID"
                : join("MapReduce Job ", $(JOB_ID)));
     commonPreHead(html);
+
+    // This is a job-summary page. Helps to refresh automatically.
+    html.meta_http("refresh", "10");
+
     set(initID(ACCORDION, "nav"), "{autoHeight:false, active:2}");
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Fri Feb 10 01:49:08 2012
@@ -116,6 +116,11 @@ public class TaskPage extends AppView {
 
   @Override protected void preHead(Page.HTML<_> html) {
     commonPreHead(html);
+
+    // This page is a list of all attempts which are limited in number. Okay to
+    // refresh automatically.
+    html.meta_http("refresh", "10");
+
     set(initID(ACCORDION, "nav"), "{autoHeight:false, active:3}");
     set(DATATABLES_ID, "attempts");
     set(initID(DATATABLES, "attempts"), attemptsTableInit());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Feb 10 01:49:08 2012
@@ -436,7 +436,7 @@ public class MRApp extends MRAppMaster {
     return new ClientService(){
       @Override
       public InetSocketAddress getBindAddress() {
-        return null;
+        return NetUtils.createSocketAddr("localhost:9876");
       }
 
       @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Fri Feb 10 01:49:08 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -29,16 +31,30 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Test;
 
 public class MRAppBenchmark {
 
@@ -167,17 +183,89 @@ public class MRAppBenchmark {
     }
   }
 
+  @Test
   public void benchmark1() throws Exception {
-    int maps = 100000;
-    int reduces = 100;
+    int maps = 100; // Adjust for benchmarking. Start with thousands.
+    int reduces = 0;
     System.out.println("Running benchmark with maps:"+maps +
         " reduces:"+reduces);
-    run(new MRApp(maps, reduces, true, this.getClass().getName(), true));
+    run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
+
+      @Override
+      protected ContainerAllocator createContainerAllocator(
+          ClientService clientService, AppContext context) {
+        return new RMContainerAllocator(clientService, context) {
+          @Override
+          protected AMRMProtocol createSchedulerProxy() {
+            return new AMRMProtocol() {
+
+              @Override
+              public RegisterApplicationMasterResponse
+                  registerApplicationMaster(
+                      RegisterApplicationMasterRequest request)
+                      throws YarnRemoteException {
+                RegisterApplicationMasterResponse response =
+                    Records.newRecord(RegisterApplicationMasterResponse.class);
+                response.setMinimumResourceCapability(BuilderUtils
+                  .newResource(1024));
+                response.setMaximumResourceCapability(BuilderUtils
+                  .newResource(10240));
+                return response;
+              }
+
+              @Override
+              public FinishApplicationMasterResponse finishApplicationMaster(
+                  FinishApplicationMasterRequest request)
+                  throws YarnRemoteException {
+                FinishApplicationMasterResponse response =
+                    Records.newRecord(FinishApplicationMasterResponse.class);
+                return response;
+              }
+
+              @Override
+              public AllocateResponse allocate(AllocateRequest request)
+                  throws YarnRemoteException {
+
+                AllocateResponse response =
+                    Records.newRecord(AllocateResponse.class);
+                List<ResourceRequest> askList = request.getAskList();
+                List<Container> containers = new ArrayList<Container>();
+                for (ResourceRequest req : askList) {
+                  if (req.getHostName() != "*") {
+                    continue;
+                  }
+                  int numContainers = req.getNumContainers();
+                  for (int i = 0; i < numContainers; i++) {
+                    ContainerId containerId =
+                        BuilderUtils.newContainerId(
+                          request.getApplicationAttemptId(),
+                          request.getResponseId() + i);
+                    containers.add(BuilderUtils
+                      .newContainer(containerId, BuilderUtils.newNodeId("host"
+                          + containerId.getId(), 2345),
+                        "host" + containerId.getId() + ":5678", req
+                          .getCapability(), req.getPriority(), null));
+                  }
+                }
+
+                AMResponse amResponse = Records.newRecord(AMResponse.class);
+                amResponse.setAllocatedContainers(containers);
+                amResponse.setResponseId(request.getResponseId() + 1);
+                response.setAMResponse(amResponse);
+                response.setNumClusterNodes(350);
+                return response;
+              }
+            };
+          }
+        };
+      }
+    });
   }
 
+  @Test
   public void benchmark2() throws Exception {
-    int maps = 4000;
-    int reduces = 1000;
+    int maps = 100; // Adjust for benchmarking, start with a couple of thousands
+    int reduces = 50;
     int maxConcurrentRunningTasks = 500;
     
     System.out.println("Running benchmark with throttled running tasks with " +

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Feb 10 01:49:08 2012
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
+
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -35,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.junit.Test;
 
 /**
@@ -175,6 +180,41 @@ public class TestMRApp {
     app.waitForState(job, JobState.ERROR);
   }
 
+  private final class MRAppWithSpiedJob extends MRApp {
+    private JobImpl spiedJob;
+
+    private MRAppWithSpiedJob(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected Job createJob(Configuration conf) {
+      spiedJob = spy((JobImpl) super.createJob(conf));
+      ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
+      return spiedJob;
+    }
+
+    JobImpl getSpiedJob() {
+      return this.spiedJob;
+    }
+  }
+
+  @Test
+  public void testCountersOnJobFinish() throws Exception {
+    MRAppWithSpiedJob app =
+        new MRAppWithSpiedJob(1, 1, true, this.getClass().getName(), true);
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    System.out.println(job.getAllCounters());
+    // Just call getCounters
+    job.getAllCounters();
+    job.getAllCounters();
+    // Should be called only once
+    verify(job, times(1)).constructFinalFullcounters();
+  }
+
   @Test
   public void checkJobStateTypeConversion() {
     //verify that all states can be converted without 
@@ -200,5 +240,6 @@ public class TestMRApp {
     t.testCommitPending();
     t.testCompletedMapsForReduceSlowstart();
     t.testJobError();
+    t.testCountersOnJobFinish();
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Fri Feb 10 01:49:08 2012
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRecovery {
 
   private static final Log LOG = LogFactory.getLog(TestRecovery.class);
@@ -112,7 +113,7 @@ public class TestRecovery {
     Assert.assertEquals("Reduce Task state not correct",
         TaskState.RUNNING, reduceTask.getReport().getTaskState());
     
-  //send the fail signal to the 1st map task attempt
+    //send the fail signal to the 1st map task attempt
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
             task1Attempt1.getID(),
@@ -193,7 +194,7 @@ public class TestRecovery {
     //RUNNING state
     app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
     
-  //send the done signal to the 2nd map task
+    //send the done signal to the 2nd map task
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
             mapTask2.getAttempts().values().iterator().next().getID(),
@@ -349,6 +350,151 @@ public class TestRecovery {
     validateOutput();
   }
 
+  @Test
+  public void testOutputRecoveryMapsOnly() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    // write output corresponding to map1 (This is just to validate that it is
+    //no included in the output)
+    writeBadOutput(task1Attempt1, conf);
+    
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+    //stop the app before the job completes.
+    app.stop();
+    
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask1 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
+    .next();
+
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task2Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for map task to complete
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task2Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+  
+  private void writeBadOutput(TaskAttempt attempt, Configuration conf)
+  throws Exception {
+  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+      TypeConverter.fromYarn(attempt.getID()));
+  
+  TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+  RecordWriter theRecordWriter = theOutputFormat
+      .getRecordWriter(tContext);
+  
+  NullWritable nullWritable = NullWritable.get();
+  try {
+    theRecordWriter.write(key2, val2);
+    theRecordWriter.write(null, nullWritable);
+    theRecordWriter.write(null, val2);
+    theRecordWriter.write(nullWritable, val1);
+    theRecordWriter.write(key1, nullWritable);
+    theRecordWriter.write(key2, null);
+    theRecordWriter.write(null, null);
+    theRecordWriter.write(key1, val1);
+  } finally {
+    theRecordWriter.close(tContext);
+  }
+  
+  OutputFormat outputFormat = ReflectionUtils.newInstance(
+      tContext.getOutputFormatClass(), conf);
+  OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+  committer.commitTask(tContext);
+}
+  
+  
   private void writeOutput(TaskAttempt attempt, Configuration conf)
     throws Exception {
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Feb 10 01:49:08 2012
@@ -18,42 +18,40 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.junit.Test;
 import org.junit.Assert;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.any;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
+import org.junit.Test;
 
 
 /**
  * Tests various functions of the JobImpl class
  */
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestJobImpl {
   
   @Test
@@ -100,7 +98,9 @@ public class TestJobImpl {
       "for successful job",
       JobImpl.checkJobCompleteSuccess(mockJob));
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);    
+        JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);
+
+    
   }
 
   @Test
@@ -133,5 +133,63 @@ public class TestJobImpl {
     t.testJobNoTasksTransition();
     t.testCheckJobCompleteSuccess();
     t.testCheckJobCompleteSuccessFailed();
+    t.testCheckAccess();
+  }
+
+  @Test
+  public void testCheckAccess() {
+    // Create two unique users
+    String user1 = System.getProperty("user.name");
+    String user2 = user1 + "1234";
+    UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user1);
+    UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser(user2);
+
+    // Create the job
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+
+    // Setup configuration access only to user1 (owner)
+    Configuration conf1 = new Configuration();
+    conf1.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf1.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
+        null, null, null, true, null, 0, null);
+    Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access to the user1 (owner) and user2
+    Configuration conf2 = new Configuration();
+    conf2.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf2.set(MRJobConfig.JOB_ACL_VIEW_JOB, user2);
+
+    // Verify access
+    JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
+        null, null, null, true, null, 0, null);
+    Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access with security enabled and access to all
+    Configuration conf3 = new Configuration();
+    conf3.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf3.set(MRJobConfig.JOB_ACL_VIEW_JOB, "*");
+
+    // Verify access
+    JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
+        null, null, null, true, null, 0, null);
+    Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
+
+    // Setup configuration access without security enabled
+    Configuration conf4 = new Configuration();
+    conf4.setBoolean(MRConfig.MR_ACLS_ENABLED, false);
+    conf4.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
+
+    // Verify access
+    JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
+        null, null, null, true, null, 0, null);
+    Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
+    Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Feb 10 01:49:08 2012
@@ -18,30 +18,54 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings("unchecked")
 public class TestTaskAttempt{
@@ -58,6 +82,96 @@ public class TestTaskAttempt{
     testMRAppHistory(app);
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testSingleRackRequest() throws Exception {
+    TaskAttemptImpl.RequestContainerTransition rct =
+        new TaskAttemptImpl.RequestContainerTransition(false);
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    String[] hosts = new String[3];
+    hosts[0] = "host1";
+    hosts[1] = "host2";
+    hosts[2] = "host3";
+    TaskSplitMetaInfo splitInfo =
+        new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+    TaskAttemptImpl mockTaskAttempt =
+        createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+    TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+
+    rct.transition(mockTaskAttempt, mockTAEvent);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+      Assert.fail("Second Event not of type ContainerRequestEvent");
+    }
+    ContainerRequestEvent cre =
+        (ContainerRequestEvent) arg.getAllValues().get(1);
+    String[] requestedRacks = cre.getRacks();
+    //Only a single occurance of /DefaultRack
+    assertEquals(1, requestedRacks.length);
+  }
+ 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testHostResolveAttempt() throws Exception {
+    TaskAttemptImpl.RequestContainerTransition rct =
+        new TaskAttemptImpl.RequestContainerTransition(false);
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    String[] hosts = new String[3];
+    hosts[0] = "192.168.1.1";
+    hosts[1] = "host2";
+    hosts[2] = "host3";
+    TaskSplitMetaInfo splitInfo =
+        new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+
+    TaskAttemptImpl mockTaskAttempt =
+        createMapTaskAttemptImplForTest(eventHandler, splitInfo);
+    TaskAttemptImpl spyTa = spy(mockTaskAttempt);
+    when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
+
+    TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
+    rct.transition(spyTa, mockTAEvent);
+    verify(spyTa).resolveHost(hosts[0]);
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    if (!(arg.getAllValues().get(1) instanceof ContainerRequestEvent)) {
+      Assert.fail("Second Event not of type ContainerRequestEvent");
+    }
+    Map<String, Boolean> expected = new HashMap<String, Boolean>();
+    expected.put("host1", true);
+    expected.put("host2", true);
+    expected.put("host3", true);
+    ContainerRequestEvent cre =
+        (ContainerRequestEvent) arg.getAllValues().get(1);
+    String[] requestedHosts = cre.getHosts();
+    for (String h : requestedHosts) {
+      expected.remove(h);
+    }
+    assertEquals(0, expected.size());
+  }
+
+  @SuppressWarnings("rawtypes")
+  private TaskAttemptImpl createMapTaskAttemptImplForTest(
+      EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    Path jobFile = mock(Path.class);
+    JobConf jobConf = new JobConf();
+    OutputCommitter outputCommitter = mock(OutputCommitter.class);
+    Clock clock = new SystemClock();
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+            null, clock);
+    return taImpl;
+  }
+
   private void testMRAppHistory(MRApp app) throws Exception {
     Configuration conf = new Configuration();
     Job job = app.submit(conf);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Feb 10 01:49:08 2012
@@ -59,7 +59,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings("rawtypes")
 public class TestTaskImpl {
 
   private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);    

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java Fri Feb 10 01:49:08 2012
@@ -777,7 +777,7 @@ public class TestAMWebServicesJobs exten
       assertTrue("name not set", (name != null && !name.isEmpty()));
       JSONArray counters = counterGroup.getJSONArray("counter");
       for (int j = 0; j < counters.length(); j++) {
-        JSONObject counter = counters.getJSONObject(i);
+        JSONObject counter = counters.getJSONObject(j);
         String counterName = counter.getString("name");
         assertTrue("counter name not set",
             (counterName != null && !counterName.isEmpty()));