You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/07/26 08:40:28 UTC

svn commit: r1150993 - in /hadoop/common/branches/MR-279/mapreduce: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/h...

Author: sharad
Date: Tue Jul 26 06:40:26 2011
New Revision: 1150993

URL: http://svn.apache.org/viewvc?rev=1150993&view=rev
Log:
MAPREDUCE-2664. Implement JobCounters for MRv2. Contributed by Siddharth Seth.

Added:
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java

Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Tue Jul 26 06:40:26 2011
@@ -5,6 +5,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    MAPREDUCE-2664. Implement JobCounters for MRv2. (Siddharth Seth via 
+    sharad)
+
     MAPREDUCE-2667. mapred job -kill leaves application in RUNNING state 
     (thomas graves via mahadev)
     

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Tue Jul 26 06:40:26 2011
@@ -36,12 +36,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -205,9 +207,23 @@ public class LocalContainerLauncher exte
           }
 
           try {
+            if (remoteTask.isMapOrReduce()) {
+              JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+              jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+              if (remoteTask.isMapTask()) {
+                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+              } else {
+                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+              }
+              context.getEventHandler().handle(jce);
+            }
             runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
                        (numReduceTasks > 0));
+            
           } catch (RuntimeException re) {
+            JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+            jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+            context.getEventHandler().handle(jce);
             // this is our signal that the subtask failed in some way, so
             // simulate a failed JVM/container and send a container-completed
             // event to task attempt (i.e., move state machine from RUNNING

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Jul 26 06:40:26 2011
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -462,6 +464,7 @@ public class JobHistoryEventHandler exte
                 .toString());
       // TODO JOB_FINISHED does not have state. Effectively job history does not
       // have state about the finished job.
+      setSummarySlotSeconds(summary, jobId);
       break;
     case JOB_FAILED:
     case JOB_KILLED:
@@ -470,10 +473,26 @@ public class JobHistoryEventHandler exte
       summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
       summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
       summary.setJobFinishTime(juce.getFinishTime());
+      setSummarySlotSeconds(summary, jobId);
       break;
     }
   }
 
+  private void setSummarySlotSeconds(JobSummary summary, JobId jobId) {
+    Counter slotMillisMapCounter =
+        context.getJob(jobId).getCounters()
+            .getCounter(JobCounter.SLOTS_MILLIS_MAPS);
+    if (slotMillisMapCounter != null) {
+      summary.setMapSlotSeconds(slotMillisMapCounter.getValue());
+    }
+    Counter slotMillisReduceCounter =
+        context.getJob(jobId).getCounters()
+            .getCounter(JobCounter.SLOTS_MILLIS_REDUCES);
+    if (slotMillisReduceCounter != null) {
+      summary.setMapSlotSeconds(slotMillisReduceCounter.getValue());
+    }
+  }
+
   protected void closeEventWriter(JobId jobId) throws IOException {
 
     final MetaInfo mi = fileMap.get(jobId);

Added: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java?rev=1150993&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java Tue Jul 26 06:40:26 2011
@@ -0,0 +1,42 @@
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobCounterUpdateEvent extends JobEvent {
+
+  List<CounterIncrementalUpdate> counterUpdates = null;
+  
+  public JobCounterUpdateEvent(JobId jobId) {
+    super(jobId, JobEventType.JOB_COUNTER_UPDATE);
+    counterUpdates = new ArrayList<JobCounterUpdateEvent.CounterIncrementalUpdate>();
+  }
+
+  public void addCounterUpdate(Enum<?> key, long incrValue) {
+    counterUpdates.add(new CounterIncrementalUpdate(key, incrValue));
+  }
+  
+  public List<CounterIncrementalUpdate> getCounterUpdates() {
+    return counterUpdates;
+  }
+  
+  public static class CounterIncrementalUpdate {
+    Enum<?> key;
+    long incrValue;
+    
+    public CounterIncrementalUpdate(Enum<?> key, long incrValue) {
+      this.key = key;
+      this.incrValue = incrValue;
+    }
+    
+    public Enum<?> getCounterKey() {
+      return key;
+    }
+
+    public long getIncrementValue() {
+      return incrValue;
+    }
+  }
+}

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Jul 26 06:40:26 2011
@@ -41,6 +41,7 @@ public enum JobEventType {
   //Producer:Any component
   JOB_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,
+  JOB_COUNTER_UPDATE,
   
   //Producer:TaskAttemptListener
   JOB_TASK_ATTEMPT_FETCH_FAILURE

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jul 26 06:40:26 2011
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -149,12 +150,8 @@ public class JobImpl implements org.apac
   private boolean lazyTasksCopyNeeded = false;
   private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
   private Counters jobCounters = newCounters();
-    // FIXME:  support job-level counters
+    // FIXME:  
     //
-    // Presumably want to define new event type that job-related entities
-    // (e.g., MRAppMaster or LocalContainerLauncher) can emit with some sort
-    // of payload (maybe just Counters?); then define new Job state-machine
-    // transition to handle the event and update jobCounters with payload data.
     // Can then replace task-level uber counters (MR-2424) with job-level ones
     // sent from LocalContainerLauncher, and eventually including a count of
     // of uber-AM attempts (probably sent from MRAppMaster).
@@ -184,6 +181,8 @@ public class JobImpl implements org.apac
   private static final TaskAttemptCompletedEventTransition
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new TaskAttemptCompletedEventTransition();
+  private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
+      new CounterUpdateTransition();
 
   protected static final
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
@@ -195,6 +194,8 @@ public class JobImpl implements org.apac
           .addTransition(JobState.NEW, JobState.NEW,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.NEW, JobState.NEW,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
               (JobState.NEW,
               EnumSet.of(JobState.INITED, JobState.FAILED),
@@ -211,6 +212,8 @@ public class JobImpl implements org.apac
           .addTransition(JobState.INITED, JobState.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.INITED, JobState.INITED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(JobState.INITED, JobState.RUNNING,
               JobEventType.JOB_START,
               new StartTransition())
@@ -244,6 +247,8 @@ public class JobImpl implements org.apac
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(JobState.RUNNING, JobState.RUNNING,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+          .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
               new TaskAttemptFetchFailureTransition())
           .addTransition(
@@ -263,6 +268,8 @@ public class JobImpl implements org.apac
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
               JobState.KILL_WAIT,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -277,6 +284,8 @@ public class JobImpl implements org.apac
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
               JobState.SUCCEEDED,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -290,6 +299,8 @@ public class JobImpl implements org.apac
           .addTransition(JobState.FAILED, JobState.FAILED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.FAILED, JobState.FAILED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
               JobState.FAILED,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -303,6 +314,8 @@ public class JobImpl implements org.apac
           .addTransition(JobState.KILLED, JobState.KILLED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.KILLED, JobState.KILLED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
               JobState.KILLED,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -460,7 +473,6 @@ public class JobImpl implements org.apac
   @Override
   public Counters getCounters() {
     Counters counters = newCounters();
-    // TODO: compute job-level counters
     readLock.lock();
     try {
       incrAllCounters(counters, jobCounters);
@@ -500,7 +512,6 @@ public class JobImpl implements org.apac
   public static Counters newCounters() {
     Counters counters = RecordFactoryProvider.getRecordFactory(null)
         .newRecordInstance(Counters.class);
-//    counters.groups = new HashMap<String, CounterGroup>();
     return counters;
   }
 
@@ -519,7 +530,6 @@ public class JobImpl implements org.apac
         if (group == null) {
           group = RecordFactoryProvider.getRecordFactory(null)
               .newRecordInstance(CounterGroup.class);
-//          group.counters = new HashMap<CharSequence, Counter>();
           group.setName(otherGroup.getName());
           counters.setCounterGroup(group.getName(), group);
         }
@@ -1363,7 +1373,7 @@ public class JobImpl implements org.apac
   private void addDiagnostic(String diag) {
     diagnostics.add(diag);
   }
-
+  
   private static class DiagnosticsUpdateTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
@@ -1372,6 +1382,18 @@ public class JobImpl implements org.apac
           .getDiagnosticUpdate());
     }
   }
+  
+  private static class CounterUpdateTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
+      for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
+          .getCounterUpdates()) {
+        job.jobCounters.incrCounter(ci.getCounterKey(), ci.getIncrementValue());
+      }
+    }
+  }
 
   private static class InternalErrorTransition implements
       SingleArcTransition<JobImpl, JobEvent> {

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jul 26 06:40:26 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -73,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -131,6 +133,8 @@ public abstract class TaskAttemptImpl im
 
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
   private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
+  private static final int MAP_MEMORY_MB_DEFAULT = 1024;
+  private static final int REDUCE_MEMORY_MB_DEFAULT = 1024;
   private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   protected final Configuration conf;
@@ -470,9 +474,9 @@ public abstract class TaskAttemptImpl im
   private int getMemoryRequired(Configuration conf, TaskType taskType) {
     int memory = 1024;
     if (taskType == TaskType.MAP)  {
-      memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, 1024);
+      memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, MAP_MEMORY_MB_DEFAULT);
     } else if (taskType == TaskType.REDUCE) {
-      memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
+      memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, REDUCE_MEMORY_MB_DEFAULT);
     }
     
     return memory;
@@ -903,6 +907,42 @@ public abstract class TaskAttemptImpl im
       finishTime = clock.getTime();
     }
   }
+  
+  private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
+    TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+    int slotMemoryReq =
+        taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+    int simSlotsRequired =
+        slotMemoryReq
+            / (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT
+                : REDUCE_MEMORY_MB_DEFAULT);
+    // Simulating MRv1 slots for counters by assuming *_MEMORY_MB_DEFAULT
+    // corresponds to a MrV1 slot.
+    // Fallow slot millis is not applicable in MRv2 - since a container is
+    // either assigned with the required memory or is not. No partial
+    // reserveations
+    long slotMillisIncrement =
+        simSlotsRequired
+            * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
+    return slotMillisIncrement;
+  }
+  
+  private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
+      TaskAttemptImpl taskAttempt) {
+    TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
+    
+    long slotMillisIncrement = computeSlotMillis(taskAttempt);
+    
+    if (taskType == TaskType.MAP) {
+      jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+    } else {
+      jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+    }
+    return jce;
+  }
 
   private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
       TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
@@ -1080,8 +1120,11 @@ public abstract class TaskAttemptImpl im
           break;
       }
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, finalState);
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                finalState);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       } else {
@@ -1106,6 +1149,15 @@ public abstract class TaskAttemptImpl im
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
                                                                   // Costly?
+      JobCounterUpdateEvent jce =
+          new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
+              .getJobId());
+      jce.addCounterUpdate(
+          taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? 
+              JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
+              , 1);
+      taskAttempt.eventHandler.handle(jce);
+      
       TaskAttemptStartedEvent tase =
         new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
@@ -1163,24 +1215,22 @@ public abstract class TaskAttemptImpl im
       String taskType = 
           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
       LOG.info("In TaskAttemptImpl taskType: " + taskType);
+      long slotMillis = computeSlotMillis(taskAttempt);
+      JobCounterUpdateEvent jce =
+          new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
+              .getJobId());
+      jce.addCounterUpdate(
+        taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? 
+          JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
+          slotMillis);
+      taskAttempt.eventHandler.handle(jce);
       taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
-          /*
-      TaskAttemptFinishedEvent tfe =
-          new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
-          TaskAttemptState.SUCCEEDED.toString(), 
-          taskAttempt.reportedStatus.finishTime, "hostname", 
-          TaskAttemptState.SUCCEEDED.toString(), 
-          TypeConverter.fromYarn(taskAttempt.getCounters()));
-      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tfe));
-      */
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_SUCCEEDED));
       taskAttempt.eventHandler.handle
       (new SpeculatorEvent
           (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
-
    }
   }
 
@@ -1190,9 +1240,13 @@ public abstract class TaskAttemptImpl im
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
       // set the finish time
       taskAttempt.setFinishTime();
+      
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, TaskAttemptState.FAILED);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                TaskAttemptState.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
         // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
@@ -1245,9 +1299,13 @@ public abstract class TaskAttemptImpl im
       taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
       //set the finish time
       taskAttempt.setFinishTime();
+      
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, TaskAttemptState.FAILED);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                TaskAttemptState.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
@@ -1268,8 +1326,11 @@ public abstract class TaskAttemptImpl im
       //set the finish time
       taskAttempt.setFinishTime();
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, TaskAttemptState.KILLED);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                TaskAttemptState.KILLED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Jul 26 06:40:26 2011
@@ -254,7 +254,7 @@ public abstract class TaskImpl implement
     this.jobToken = jobToken;
     this.metrics = metrics;
 
-    if (completedTasksFromPreviousRun != null 
+    if (completedTasksFromPreviousRun != null
         && completedTasksFromPreviousRun.contains(taskId)) {
       LOG.info("Task is from previous run " + taskId);
       startCount = startCount - 1;

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Tue Jul 26 06:40:26 2011
@@ -22,11 +22,15 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -74,6 +78,15 @@ public class LocalContainerAllocator ext
       container.setContainerToken(null);
       container.setNodeHttpAddress("localhost:9999");
       // send the container-assigned event to task attempt
+
+      if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+        JobCounterUpdateEvent jce =
+            new JobCounterUpdateEvent(event.getAttemptID().getTaskId()
+                .getJobId());
+        // TODO Setting OTHER_LOCAL_MAP for now.
+        jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+        eventHandler.handle(jce);
+      }
       eventHandler.handle(new TaskAttemptContainerAssignedEvent(
           event.getAttemptID(), container));
     }

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Tue Jul 26 06:40:26 2011
@@ -46,6 +46,7 @@ public class ContainerRequestEvent exten
   public static ContainerRequestEvent createContainerRequestEventForFailedContainer(
       TaskAttemptId attemptID, 
       Resource capability) {
+    //ContainerRequest for failed events does not consider rack / node locality?
     return new ContainerRequestEvent(attemptID, capability);
   }
 

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jul 26 06:40:26 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -34,12 +36,14 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -456,6 +460,7 @@ public class RMContainerAllocator extend
     private final LinkedList<TaskAttemptId> earlierFailedMaps = 
       new LinkedList<TaskAttemptId>();
     
+    /** Maps from a host to a list of Map tasks with data on the host */
     private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping = 
       new HashMap<String, LinkedList<TaskAttemptId>>();
     private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 
@@ -501,6 +506,18 @@ public class RMContainerAllocator extend
         request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
       } else {
         for (String host : event.getHosts()) {
+          //host comes from data splitLocations which are hostnames. Containers
+          // use IP addresses.
+          //TODO Temporary fix for locality. Use resolvers from h-common. 
+          // Cache to make this more efficient ?
+          InetAddress addr = null;
+          try {
+            addr = InetAddress.getByName(host);
+          } catch (UnknownHostException e) {
+            LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
+          }
+          if (addr != null) //Fallback to host if resolve fails.
+            host = addr.getHostAddress();
           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
           if (list == null) {
             list = new LinkedList<TaskAttemptId>();
@@ -585,6 +602,7 @@ public class RMContainerAllocator extend
       //try to assign to earlierFailedMaps if present
       assigned = assignToFailedMap(allocated);
       
+      //Assign to reduces before assigning to maps ?
       if (assigned == null) {
         assigned = assignToReduce(allocated);
       }
@@ -606,6 +624,10 @@ public class RMContainerAllocator extend
         TaskAttemptId tId = earlierFailedMaps.removeFirst();
         if (maps.containsKey(tId)) {
           assigned = maps.remove(tId);
+          JobCounterUpdateEvent jce =
+            new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+          jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+          eventHandler.handle(jce);
           LOG.info("Assigned from earlierFailedMaps");
           break;
         }
@@ -638,6 +660,10 @@ public class RMContainerAllocator extend
           TaskAttemptId tId = list.removeFirst();
           if (maps.containsKey(tId)) {
             assigned = maps.remove(tId);
+            JobCounterUpdateEvent jce =
+              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+            jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
+            eventHandler.handle(jce);
             hostLocalAssigned++;
             LOG.info("Assigned based on host match " + host);
             break;
@@ -650,6 +676,10 @@ public class RMContainerAllocator extend
             TaskAttemptId tId = list.removeFirst();
             if (maps.containsKey(tId)) {
               assigned = maps.remove(tId);
+              JobCounterUpdateEvent jce =
+                new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+              jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
+              eventHandler.handle(jce);
               rackLocalAssigned++;
               LOG.info("Assigned based on rack match " + rack);
               break;
@@ -658,6 +688,10 @@ public class RMContainerAllocator extend
           if (assigned == null && maps.size() > 0) {
             TaskAttemptId tId = maps.keySet().iterator().next();
             assigned = maps.remove(tId);
+            JobCounterUpdateEvent jce =
+              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+            jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+            eventHandler.handle(jce);
             LOG.info("Assigned based on * match");
             break;
           }

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Jul 26 06:40:26 2011
@@ -469,7 +469,9 @@ public class TestRMContainerAllocator {
         return new EventHandler() {
           @Override
           public void handle(Event event) {
-            events.add((TaskAttemptContainerAssignedEvent) event);
+            if (event instanceof TaskAttemptContainerAssignedEvent) {
+              events.add((TaskAttemptContainerAssignedEvent) event);
+            } //Ignoring JobCounterUpdateEvents
           }
         };
       }

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java Tue Jul 26 06:40:26 2011
@@ -11,4 +11,6 @@ public interface Counters {
   public abstract void setCounterGroup(String key, CounterGroup value);
   public abstract void removeCounterGroup(String key);
   public abstract void clearCounterGroups();
+  
+  public abstract void incrCounter(Enum<?> key, long amount);
 }

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java Tue Jul 26 06:40:26 2011
@@ -78,6 +78,26 @@ public class CountersPBImpl extends Prot
     CounterGroup group = getCounterGroup(key.getDeclaringClass().getName());
     return group == null ? null : group.getCounter(key.name());
   }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    String groupName = key.getDeclaringClass().getName();
+    if (getCounterGroup(groupName) == null) {
+      CounterGroup cGrp = new CounterGroupPBImpl();
+      cGrp.setName(groupName);
+      cGrp.setDisplayName(groupName);
+      setCounterGroup(groupName, cGrp);
+    }
+    if (getCounterGroup(groupName).getCounter(key.name()) == null) {
+      Counter c = new CounterPBImpl();
+      c.setName(key.name());
+      c.setDisplayName(key.name());
+      c.setValue(0l);
+      getCounterGroup(groupName).setCounter(key.name(), c);
+    }
+    Counter counter = getCounterGroup(groupName).getCounter(key.name());
+    counter.setValue(counter.getValue() + amount);
+  }
  
   private void initCounterGroups() {
     if (this.counterGroups != null) {

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Tue Jul 26 06:40:26 2011
@@ -48,7 +48,9 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -124,7 +126,6 @@ public class TestMRJobs {
   @Test
   public void testSleepJob() throws IOException, InterruptedException,
       ClassNotFoundException { 
-
     LOG.info("\n\n\nStarting testSleepJob().");
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
@@ -148,17 +149,35 @@ public class TestMRJobs {
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-
+    verifySleepJobCounters(job);
+    
+    
     // TODO later:  add explicit "isUber()" checks of some sort (extend
     // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
   }
 
+  protected void verifySleepJobCounters(Job job) throws InterruptedException,
+      IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(2,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+  }
+
   @Test
   public void testRandomWriter() throws IOException, InterruptedException,
       ClassNotFoundException {
-
+    
     LOG.info("\n\n\nStarting testRandomWriter().");
-
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
                + " not found. Not running test.");
@@ -169,8 +188,8 @@ public class TestMRJobs {
     mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
     mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
     Job job = randomWriterJob.createJob(mrCluster.getConfig());
-    Path outputDir = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
-        "random-output");
+    Path outputDir =
+        new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
     FileOutputFormat.setOutputPath(job, outputDir);
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(RandomTextWriterJob.class);
@@ -179,6 +198,7 @@ public class TestMRJobs {
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
     // Make sure there are three files in the output-dir
+    
     RemoteIterator<FileStatus> iterator =
         FileContext.getFileContext(mrCluster.getConfig()).listStatus(
             outputDir);
@@ -191,9 +211,22 @@ public class TestMRJobs {
       }
     }
     Assert.assertEquals("Number of part files is wrong!", 3, count);
+    verifyRandomWriterCounters(job);
 
     // TODO later:  add explicit "isUber()" checks of some sort
   }
+  
+  protected void verifyRandomWriterCounters(Job job)
+      throws InterruptedException, IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+  }
 
   @Test
   public void testFailingMapper() throws IOException, InterruptedException,
@@ -227,9 +260,24 @@ public class TestMRJobs {
     Assert.assertEquals(TaskCompletionEvent.Status.FAILED, 
         events[1].getStatus().FAILED);
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+    verifyFailingMapperCounters(job);
 
     // TODO later:  add explicit "isUber()" checks of some sort
   }
+  
+  protected void verifyFailingMapperCounters(Job job)
+      throws InterruptedException, IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+        .getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+  }
 
   protected Job runFailingMapperJob()
   throws IOException, InterruptedException, ClassNotFoundException {

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Tue Jul 26 06:40:26 2011
@@ -25,7 +25,9 @@ import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -55,6 +57,32 @@ public class TestUberAM extends TestMRJo
     }
     super.testSleepJob();
   }
+  
+  @Override
+  protected void verifySleepJobCounters(Job job) throws InterruptedException,
+      IOException {
+    Counters counters = job.getCounters();
+
+    Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(1,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+
+    Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+        .getValue());
+    Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES)
+        .getValue());
+    Assert.assertEquals(4,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+  }
 
   @Override
   public void testRandomWriter()
@@ -63,6 +91,17 @@ public class TestUberAM extends TestMRJo
   }
 
   @Override
+  protected void verifyRandomWriterCounters(Job job)
+      throws InterruptedException, IOException {
+    super.verifyRandomWriterCounters(job);
+    Counters counters = job.getCounters();
+    Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+        .getValue());
+    Assert.assertEquals(3,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+  }
+
+  @Override
   public void testFailingMapper()
   throws IOException, InterruptedException, ClassNotFoundException {
     LOG.info("\n\n\nStarting uberized testFailingMapper().");
@@ -100,9 +139,34 @@ public class TestUberAM extends TestMRJo
     Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
         events[0].getStatus().FAILED);
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+    
+    //Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS
+    //verifyFailingMapperCounters(job);
 
     // TODO later:  add explicit "isUber()" checks of some sort
   }
+  
+  @Override
+  protected void verifyFailingMapperCounters(Job job)
+      throws InterruptedException, IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+        .getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+
+    Assert.assertEquals(2,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+        .getValue());
+    Assert.assertEquals(2, counters
+        .findCounter(JobCounter.NUM_FAILED_UBERTASKS).getValue());
+  }
 
 //@Test  //FIXME:  if/when the corresponding TestMRJobs test gets enabled, do so here as well (potentially with mods for ubermode)
   public void testSleepJobWithSecurityOn()