You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/10/20 02:34:02 UTC

svn commit: r1400351 [1/3] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...

Author: szetszwo
Date: Sat Oct 20 00:33:57 2012
New Revision: 1400351

URL: http://svn.apache.org/viewvc?rev=1400351&view=rev
Log:
Merge r1399946 through r1400349 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
      - copied unchanged from r1400349, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
      - copied unchanged from r1400349, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java
      - copied unchanged from r1400349, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskStateInternal.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/c++/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/block_forensics/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/build-contrib.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/build.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/data_join/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/eclipse-plugin/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/vaidya/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/examples/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/webapps/job/   (props changed)

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1399946-1400349

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Sat Oct 20 00:33:57 2012
@@ -42,7 +42,8 @@ Trunk (Unreleased)
     MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides
     client APIs cross MR1 and MR2 (Ahmed via tucu)
 
-    MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks() (XieXianshan via harsh)
+    MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks()
+    (XieXianshan via harsh)
 
     MAPREDUCE-3956. Remove the use of the deprecated Syncable.sync() method from
     TeraOutputFormat in the terasort example.  (szetszwo)
@@ -61,7 +62,11 @@ Trunk (Unreleased)
     MAPREDUCE-4371. Check for cyclic dependencies in Jobcontrol job DAG
     (madhukara phatak via bobby)
 
-    MAPREDUCE-4686. hadoop-mapreduce-client-core fails compilation in Eclipse due to missing Avro-generated classes (Chris Nauroth via harsh)
+    MAPREDUCE-4686. hadoop-mapreduce-client-core fails compilation in Eclipse 
+    due to missing Avro-generated classes (Chris Nauroth via harsh)
+
+    MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive.
+    (Brandon Li via suresh)
 
   BUG FIXES
 
@@ -181,6 +186,9 @@ Release 2.0.3-alpha - Unreleased 
 
     MAPREDUCE-4654. TestDistCp is ignored. (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-4736. Remove obsolete option [-rootDir] from TestDFSIO.
+    (Brandon Li via suresh)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES
@@ -565,6 +573,9 @@ Release 0.23.5 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
+    for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv) 
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -582,6 +593,12 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4721. Task startup time in JHS is same as job startup time.
     (Ravi Prakash via bobby)
 
+    MAPREDUCE-4479. Fix parameter order in assertEquals() in 
+    TestCombineInputFileFormat.java (Mariappan Asokan via bobby)
+
+    MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
+    reducers complete consecutively. (Jason Lowe via vinodkv)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1399946-1400349

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1399946-1400349

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Sat Oct 20 00:33:57 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRa
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -253,31 +251,23 @@ public class TaskAttemptListenerImpl ext
 
   @Override
   public MapTaskCompletionEventsUpdate getMapCompletionEvents(
-      JobID jobIdentifier, int fromEventId, int maxEvents,
+      JobID jobIdentifier, int startIndex, int maxEvents,
       TaskAttemptID taskAttemptID) throws IOException {
     LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
-        + ". fromEventID " + fromEventId + " maxEvents " + maxEvents);
+        + ". startIndex " + startIndex + " maxEvents " + maxEvents);
 
     // TODO: shouldReset is never used. See TT. Ask for Removal.
     boolean shouldReset = false;
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
       TypeConverter.toYarn(taskAttemptID);
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
-        context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
-            fromEventId, maxEvents);
+        context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents(
+            startIndex, maxEvents);
 
     taskHeartbeatHandler.progressing(attemptID);
-
-    // filter the events to return only map completion events in old format
-    List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
-    for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent event : events) {
-      if (TaskType.MAP.equals(event.getAttemptId().getTaskId().getTaskType())) {
-        mapEvents.add(TypeConverter.fromYarn(event));
-      }
-    }
     
     return new MapTaskCompletionEventsUpdate(
-        mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset);
+        TypeConverter.fromYarn(events), shouldReset);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Sat Oct 20 00:33:57 2012
@@ -88,6 +88,9 @@ public interface Job {
   TaskAttemptCompletionEvent[]
       getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
 
+  TaskAttemptCompletionEvent[]
+      getMapAttemptCompletionEvents(int startIndex, int maxEvents);
+
   /**
    * @return information for MR AppMasters (previously failed and current)
    */

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Oct 20 00:33:57 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
@@ -189,6 +190,7 @@ public class JobImpl implements org.apac
   private int allowedMapFailuresPercent = 0;
   private int allowedReduceFailuresPercent = 0;
   private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
+  private List<TaskAttemptCompletionEvent> mapAttemptCompletionEvents;
   private final List<String> diagnostics = new ArrayList<String>();
   
   //task/attempt related datastructures
@@ -210,163 +212,163 @@ public class JobImpl implements org.apac
       new UpdatedNodesTransition();
 
   protected static final
-    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
        stateMachineFactory
-     = new StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
-              (JobState.NEW)
+     = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+              (JobStateInternal.NEW)
 
           // Transitions from NEW state
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
-              (JobState.NEW,
-              EnumSet.of(JobState.INITED, JobState.FAILED),
+              (JobStateInternal.NEW,
+              EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
               JobEventType.JOB_INIT,
               new InitTransition())
-          .addTransition(JobState.NEW, JobState.KILLED,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillNewJobTransition())
-          .addTransition(JobState.NEW, JobState.ERROR,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.NEW, JobState.NEW,
+          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_UPDATED_NODES)
               
           // Transitions from INITED state
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobState.INITED, JobState.RUNNING,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING,
               JobEventType.JOB_START,
               new StartTransition())
-          .addTransition(JobState.INITED, JobState.KILLED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
               JobEventType.JOB_KILL,
               new KillInitedJobTransition())
-          .addTransition(JobState.INITED, JobState.ERROR,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.INITED, JobState.INITED,
+          .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_UPDATED_NODES)
               
           // Transitions from RUNNING state
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
-              (JobState.RUNNING,
-              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              (JobStateInternal.RUNNING,
+              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
           .addTransition
-              (JobState.RUNNING,
-              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              (JobStateInternal.RUNNING,
+              EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
               JobEventType.JOB_COMPLETED,
               new JobNoTasksCompletedTransition())
-          .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_KILL, new KillTasksTransition())
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_UPDATED_NODES,
               UPDATED_NODES_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_MAP_TASK_RESCHEDULED,
               new MapTaskRescheduledTransition())
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobState.RUNNING, JobState.RUNNING,
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
               new TaskAttemptFetchFailureTransition())
           .addTransition(
-              JobState.RUNNING,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.RUNNING,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from KILL_WAIT state.
           .addTransition
-              (JobState.KILL_WAIT,
-              EnumSet.of(JobState.KILL_WAIT, JobState.KILLED),
+              (JobStateInternal.KILL_WAIT,
+              EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED),
               JobEventType.JOB_TASK_COMPLETED,
               new KillWaitTaskCompletedTransition())
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.KILL_WAIT,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.KILL_WAIT,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+          .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
               EnumSet.of(JobEventType.JOB_KILL,
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from SUCCEEDED state
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.SUCCEEDED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.SUCCEEDED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+          .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from FAILED state
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.FAILED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.FAILED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.FAILED, JobState.FAILED,
+          .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from KILLED state
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
-              JobState.KILLED,
-              JobState.ERROR, JobEventType.INTERNAL_ERROR,
+              JobStateInternal.KILLED,
+              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
-          .addTransition(JobState.KILLED, JobState.KILLED,
+          .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
-              JobState.ERROR,
-              JobState.ERROR,
+              JobStateInternal.ERROR,
+              JobStateInternal.ERROR,
               EnumSet.of(JobEventType.JOB_INIT,
                   JobEventType.JOB_KILL,
                   JobEventType.JOB_TASK_COMPLETED,
@@ -376,12 +378,12 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
-          .addTransition(JobState.ERROR, JobState.ERROR,
+          .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           // create the topology tables
           .installTopology();
  
-  private final StateMachine<JobState, JobEventType, JobEvent> stateMachine;
+  private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
 
   //changing fields while the job is running
   private int numMapTasks;
@@ -446,7 +448,7 @@ public class JobImpl implements org.apac
     stateMachine = stateMachineFactory.make(this);
   }
 
-  protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+  protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
     return stateMachine;
   }
 
@@ -520,9 +522,9 @@ public class JobImpl implements org.apac
     readLock.lock();
 
     try {
-      JobState state = getState();
-      if (state == JobState.ERROR || state == JobState.FAILED
-          || state == JobState.KILLED || state == JobState.SUCCEEDED) {
+      JobStateInternal state = getInternalState();
+      if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED
+          || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) {
         this.mayBeConstructFinalFullCounters();
         return fullCounters;
       }
@@ -547,14 +549,28 @@ public class JobImpl implements org.apac
   @Override
   public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
       int fromEventId, int maxEvents) {
+    return getAttemptCompletionEvents(taskAttemptCompletionEvents,
+        fromEventId, maxEvents);
+  }
+
+  @Override
+  public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+      int startIndex, int maxEvents) {
+    return getAttemptCompletionEvents(mapAttemptCompletionEvents,
+        startIndex, maxEvents);
+  }
+
+  private TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
+      List<TaskAttemptCompletionEvent> eventList,
+      int startIndex, int maxEvents) {
     TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
     readLock.lock();
     try {
-      if (taskAttemptCompletionEvents.size() > fromEventId) {
+      if (eventList.size() > startIndex) {
         int actualMax = Math.min(maxEvents,
-            (taskAttemptCompletionEvents.size() - fromEventId));
-        events = taskAttemptCompletionEvents.subList(fromEventId,
-            actualMax + fromEventId).toArray(events);
+            (eventList.size() - startIndex));
+        events = eventList.subList(startIndex,
+            actualMax + startIndex).toArray(events);
       }
       return events;
     } finally {
@@ -587,7 +603,7 @@ public class JobImpl implements org.apac
         diagsb.append(s).append("\n");
       }
 
-      if (getState() == JobState.NEW) {
+      if (getInternalState() == JobStateInternal.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
             cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
@@ -674,7 +690,7 @@ public class JobImpl implements org.apac
   public JobState getState() {
     readLock.lock();
     try {
-     return getStateMachine().getCurrentState();
+      return getExternalState(getStateMachine().getCurrentState());
     } finally {
       readLock.unlock();
     }
@@ -695,7 +711,7 @@ public class JobImpl implements org.apac
     LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
     try {
       writeLock.lock();
-      JobState oldState = getState();
+      JobStateInternal oldState = getInternalState();
       try {
          getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -706,9 +722,9 @@ public class JobImpl implements org.apac
             JobEventType.INTERNAL_ERROR));
       }
       //notify the eventhandler of state change
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
         LOG.info(jobId + "Job Transitioned from " + oldState + " to "
-                 + getState());
+                 + getInternalState());
       }
     }
     
@@ -717,6 +733,25 @@ public class JobImpl implements org.apac
     }
   }
 
+  @Private
+  public JobStateInternal getInternalState() {
+    readLock.lock();
+    try {
+     return getStateMachine().getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  private static JobState getExternalState(JobStateInternal smState) {
+    if (smState == JobStateInternal.KILL_WAIT) {
+      return JobState.KILLED;
+    } else {
+      return JobState.valueOf(smState.name());
+    }
+  }
+  
+  
   //helpful in testing
   protected void addTask(Task task) {
     synchronized (tasksSyncHandle) {
@@ -757,7 +792,7 @@ public class JobImpl implements org.apac
     return FileSystem.get(conf);
   }
   
-  static JobState checkJobCompleteSuccess(JobImpl job) {
+  static JobStateInternal checkJobCompleteSuccess(JobImpl job) {
     // check for Job success
     if (job.completedTaskCount == job.tasks.size()) {
       try {
@@ -767,16 +802,16 @@ public class JobImpl implements org.apac
         LOG.error("Could not do commit for Job", e);
         job.addDiagnostic("Job commit failed: " + e.getMessage());
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
       job.logJobHistoryFinishedEvent();
-      return job.finished(JobState.SUCCEEDED);
+      return job.finished(JobStateInternal.SUCCEEDED);
     }
     return null;
   }
 
-  JobState finished(JobState finalState) {
-    if (getState() == JobState.RUNNING) {
+  JobStateInternal finished(JobStateInternal finalState) {
+    if (getInternalState() == JobStateInternal.RUNNING) {
       metrics.endRunningJob(this);
     }
     if (finishTime == 0) setFinishTime();
@@ -989,7 +1024,7 @@ public class JobImpl implements org.apac
   */
 
   public static class InitTransition 
-      implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+      implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     /**
      * Note that this transition method is called directly (and synchronously)
@@ -999,7 +1034,7 @@ public class JobImpl implements org.apac
      * way; MR version is).
      */
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.metrics.submittedJob(job);
       job.metrics.preparingJob(job);
       try {
@@ -1050,6 +1085,8 @@ public class JobImpl implements org.apac
         job.taskAttemptCompletionEvents =
             new ArrayList<TaskAttemptCompletionEvent>(
                 job.numMapTasks + job.numReduceTasks + 10);
+        job.mapAttemptCompletionEvents =
+            new ArrayList<TaskAttemptCompletionEvent>(job.numMapTasks + 10);
 
         job.allowedMapFailuresPercent =
             job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
@@ -1065,7 +1102,7 @@ public class JobImpl implements org.apac
         createReduceTasks(job);
 
         job.metrics.endPreparingJob(job);
-        return JobState.INITED;
+        return JobStateInternal.INITED;
         //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
 
       } catch (IOException e) {
@@ -1074,7 +1111,7 @@ public class JobImpl implements org.apac
             + StringUtils.stringifyException(e));
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         job.metrics.endPreparingJob(job);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
     }
 
@@ -1282,9 +1319,9 @@ public class JobImpl implements org.apac
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobState.KILLED.toString());
+              JobStateInternal.KILLED.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobState.KILLED);
+      job.finished(JobStateInternal.KILLED);
     }
   }
 
@@ -1294,7 +1331,7 @@ public class JobImpl implements org.apac
     public void transition(JobImpl job, JobEvent event) {
       job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
       job.addDiagnostic("Job received Kill in INITED state.");
-      job.finished(JobState.KILLED);
+      job.finished(JobStateInternal.KILLED);
     }
   }
 
@@ -1321,6 +1358,9 @@ public class JobImpl implements org.apac
       //eventId is equal to index in the arraylist
       tce.setEventId(job.taskAttemptCompletionEvents.size());
       job.taskAttemptCompletionEvents.add(tce);
+      if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
+        job.mapAttemptCompletionEvents.add(tce);
+      }
       
       TaskAttemptId attemptId = tce.getAttemptId();
       TaskId taskId = attemptId.getTaskId();
@@ -1394,10 +1434,10 @@ public class JobImpl implements org.apac
   }
 
   private static class TaskCompletedTransition implements
-      MultipleArcTransition<JobImpl, JobEvent, JobState> {
+      MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
       job.completedTaskCount++;
       LOG.info("Num completed Tasks: " + job.completedTaskCount);
       JobTaskEvent taskEvent = (JobTaskEvent) event;
@@ -1413,7 +1453,7 @@ public class JobImpl implements org.apac
       return checkJobForCompletion(job);
     }
 
-    protected JobState checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobForCompletion(JobImpl job) {
       //check for Job failure
       if (job.failedMapTaskCount*100 > 
         job.allowedMapFailuresPercent*job.numMapTasks ||
@@ -1427,16 +1467,16 @@ public class JobImpl implements org.apac
         LOG.info(diagnosticMsg);
         job.addDiagnostic(diagnosticMsg);
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return job.finished(JobState.FAILED);
+        return job.finished(JobStateInternal.FAILED);
       }
       
-      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
       if (jobCompleteSuccess != null) {
         return jobCompleteSuccess;
       }
       
       //return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
 
     private void taskSucceeded(JobImpl job, Task task) {
@@ -1470,17 +1510,17 @@ public class JobImpl implements org.apac
 
   // Transition class for handling jobs with no tasks
   static class JobNoTasksCompletedTransition implements
-  MultipleArcTransition<JobImpl, JobEvent, JobState> {
+  MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
 
     @Override
-    public JobState transition(JobImpl job, JobEvent event) {
-      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+    public JobStateInternal transition(JobImpl job, JobEvent event) {
+      JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
       if (jobCompleteSuccess != null) {
         return jobCompleteSuccess;
       }
       
       // Return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
   }
 
@@ -1497,14 +1537,14 @@ public class JobImpl implements org.apac
   private static class KillWaitTaskCompletedTransition extends  
       TaskCompletedTransition {
     @Override
-    protected JobState checkJobForCompletion(JobImpl job) {
+    protected JobStateInternal checkJobForCompletion(JobImpl job) {
       if (job.completedTaskCount == job.tasks.size()) {
         job.setFinishTime();
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-        return job.finished(JobState.KILLED);
+        return job.finished(JobStateInternal.KILLED);
       }
       //return the current state, Job not finished yet
-      return job.getState();
+      return job.getInternalState();
     }
   }
 
@@ -1558,9 +1598,9 @@ public class JobImpl implements org.apac
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobState.ERROR.toString());
+              JobStateInternal.ERROR.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobState.ERROR);
+      job.finished(JobStateInternal.ERROR);
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sat Oct 20 00:33:57 2012
@@ -39,6 +39,7 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -72,10 +73,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -88,7 +89,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@@ -132,6 +132,7 @@ import org.apache.hadoop.yarn.util.Build
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -184,149 +185,149 @@ public abstract class TaskAttemptImpl im
       = new DiagnosticInformationUpdater();
 
   private static final StateMachineFactory
-        <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+        <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
         stateMachineFactory
     = new StateMachineFactory
-             <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-           (TaskAttemptState.NEW)
+             <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
+           (TaskAttemptStateInternal.NEW)
 
      // Transitions from the NEW state.
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
          TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
          TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new KilledTransition())
-     .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
 
      // Transitions from the UNASSIGNED state.
-     .addTransition(TaskAttemptState.UNASSIGNED,
-         TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
+         TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
          new ContainerAssignedTransition())
-     .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
-             TaskAttemptState.KILLED, true))
-     .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED,
+             TaskAttemptStateInternal.KILLED, true))
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
-             TaskAttemptState.FAILED, true))
+             TaskAttemptStateInternal.FAILED, true))
 
      // Transitions from the ASSIGNED state.
-     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
          new LaunchedContainerTransition())
-     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.ASSIGNED,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.ASSIGNED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
-         new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
-     .addTransition(TaskAttemptState.ASSIGNED,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
+     .addTransition(TaskAttemptStateInternal.ASSIGNED,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
       // ^ If RM kills the container due to expiry, preemption etc. 
-     .addTransition(TaskAttemptState.ASSIGNED, 
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, 
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.ASSIGNED, 
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.ASSIGNED, 
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from RUNNING state.
-     .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+     .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
-     .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+     .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // If no commit is required, task directly goes to success
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
      // If commit is required, task goes through commit pending state.
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.COMMIT_PENDING,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
      // Failure handling while RUNNING
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
       //for handling container exit without sending the done or fail msg
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
      // Timeout handling while RUNNING
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
      // if container killed by AM shutting down
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
      // Kill handling
-     .addTransition(TaskAttemptState.RUNNING,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from COMMIT_PENDING state
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
          new StatusUpdater())
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.COMMIT_PENDING,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.COMMIT_PENDING,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
      // if container killed by AM shutting down
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
-     .addTransition(TaskAttemptState.COMMIT_PENDING,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from SUCCESS_CONTAINER_CLEANUP state
      // kill and cleanup the container
-     .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
+     .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
          new SucceededTransition())
      .addTransition(
-          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+          TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+          TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
           TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
           DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
       // Ignore-able events
-     .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
      // Transitions from FAIL_CONTAINER_CLEANUP state.
-     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptState.FAIL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
-     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
       // Ignore-able events
-     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -339,17 +340,17 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_TIMED_OUT))
 
       // Transitions from KILL_CONTAINER_CLEANUP
-     .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
-         TaskAttemptState.KILL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_TASK_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
-     .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events
      .addTransition(
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
-         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+         TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -362,16 +363,16 @@ public abstract class TaskAttemptImpl im
 
      // Transitions from FAIL_TASK_CLEANUP
      // run the task cleanup
-     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
-         TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
+     .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+         TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
          new FailedTransition())
-     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
-         TaskAttemptState.FAIL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+         TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
       // Ignore-able events
-     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
-         TaskAttemptState.FAIL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
+         TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -384,16 +385,16 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
 
      // Transitions from KILL_TASK_CLEANUP
-     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
-         TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
+     .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+         TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
          new KilledTransition())
-     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
-         TaskAttemptState.KILL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+         TaskAttemptStateInternal.KILL_TASK_CLEANUP,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events
-     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
-         TaskAttemptState.KILL_TASK_CLEANUP,
+     .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
+         TaskAttemptStateInternal.KILL_TASK_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
@@ -406,31 +407,31 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
 
       // Transitions from SUCCEEDED
-     .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
-         TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.SUCCEEDED, //only possible for map attempts
+         TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
          new TooManyFetchFailureTransition())
-      .addTransition(TaskAttemptState.SUCCEEDED,
-          EnumSet.of(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED),
+      .addTransition(TaskAttemptStateInternal.SUCCEEDED,
+          EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
           TaskAttemptEventType.TA_KILL, 
           new KilledAfterSuccessTransition())
      .addTransition(
-         TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
+         TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events for SUCCEEDED state
-     .addTransition(TaskAttemptState.SUCCEEDED,
-         TaskAttemptState.SUCCEEDED,
+     .addTransition(TaskAttemptStateInternal.SUCCEEDED,
+         TaskAttemptStateInternal.SUCCEEDED,
          EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
      // Transitions from FAILED state
-     .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
        TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
        DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events for FAILED state
-     .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+     .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_ASSIGNED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@@ -445,11 +446,11 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE))
 
      // Transitions from KILLED state
-     .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events for KILLED state
-     .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+     .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
              TaskAttemptEventType.TA_ASSIGNED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@@ -466,7 +467,7 @@ public abstract class TaskAttemptImpl im
      .installTopology();
 
   private final StateMachine
-         <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+         <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
     stateMachine;
 
   private ContainerId containerID;
@@ -874,9 +875,9 @@ public abstract class TaskAttemptImpl im
     readLock.lock();
     try {
       // TODO: Use stateMachine level method?
-      return (getState() == TaskAttemptState.SUCCEEDED || 
-          getState() == TaskAttemptState.FAILED ||
-          getState() == TaskAttemptState.KILLED);
+      return (getInternalState() == TaskAttemptStateInternal.SUCCEEDED || 
+              getInternalState() == TaskAttemptStateInternal.FAILED ||
+              getInternalState() == TaskAttemptStateInternal.KILLED);
     } finally {
       readLock.unlock();
     }
@@ -953,7 +954,7 @@ public abstract class TaskAttemptImpl im
   public TaskAttemptState getState() {
     readLock.lock();
     try {
-      return stateMachine.getCurrentState();
+      return getExternalState(stateMachine.getCurrentState());
     } finally {
       readLock.unlock();
     }
@@ -968,7 +969,7 @@ public abstract class TaskAttemptImpl im
     }
     writeLock.lock();
     try {
-      final TaskAttemptState oldState = getState();
+      final TaskAttemptStateInternal oldState = getInternalState()  ;
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -980,16 +981,58 @@ public abstract class TaskAttemptImpl im
         eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
             JobEventType.INTERNAL_ERROR));
       }
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
           LOG.info(attemptId + " TaskAttempt Transitioned from " 
            + oldState + " to "
-           + getState());
+           + getInternalState());
       }
     } finally {
       writeLock.unlock();
     }
   }
 
+  @VisibleForTesting
+  public TaskAttemptStateInternal getInternalState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static TaskAttemptState getExternalState(
+      TaskAttemptStateInternal smState) {
+    switch (smState) {
+    case ASSIGNED:
+    case UNASSIGNED:
+      return TaskAttemptState.STARTING;
+    case COMMIT_PENDING:
+      return TaskAttemptState.COMMIT_PENDING;
+    case FAILED:
+      return TaskAttemptState.FAILED;
+    case KILLED:
+      return TaskAttemptState.KILLED;
+      // All CLEANUP states considered as RUNNING since events have not gone out
+      // to the Task yet. May be possible to consider them as a Finished state.
+    case FAIL_CONTAINER_CLEANUP:
+    case FAIL_TASK_CLEANUP:
+    case KILL_CONTAINER_CLEANUP:
+    case KILL_TASK_CLEANUP:
+    case SUCCESS_CONTAINER_CLEANUP:
+    case RUNNING:
+      return TaskAttemptState.RUNNING;
+    case NEW:
+      return TaskAttemptState.NEW;
+    case SUCCEEDED:
+      return TaskAttemptState.SUCCEEDED;
+    default:
+      throw new YarnException("Attempt to convert invalid "
+          + "stateMachineTaskAttemptState to externalTaskAttemptState: "
+          + smState);
+    }
+  }
+
   //always called in write lock
   private void setFinishTime() {
     //set the finish time only if launch time is set
@@ -1066,7 +1109,7 @@ public abstract class TaskAttemptImpl im
   private static
       TaskAttemptUnsuccessfulCompletionEvent
       createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
-          TaskAttemptState attemptState) {
+          TaskAttemptStateInternal attemptState) {
     TaskAttemptUnsuccessfulCompletionEvent tauce =
         new TaskAttemptUnsuccessfulCompletionEvent(
             TypeConverter.fromYarn(taskAttempt.attemptId),
@@ -1247,10 +1290,10 @@ public abstract class TaskAttemptImpl im
 
   private static class DeallocateContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    private final TaskAttemptState finalState;
+    private final TaskAttemptStateInternal finalState;
     private final boolean withdrawsContainerRequest;
     DeallocateContainerTransition
-        (TaskAttemptState finalState, boolean withdrawsContainerRequest) {
+        (TaskAttemptStateInternal finalState, boolean withdrawsContainerRequest) {
       this.finalState = finalState;
       this.withdrawsContainerRequest = withdrawsContainerRequest;
     }
@@ -1288,10 +1331,10 @@ public abstract class TaskAttemptImpl im
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 finalState);
-        if(finalState == TaskAttemptState.FAILED) {
+        if(finalState == TaskAttemptStateInternal.FAILED) {
           taskAttempt.eventHandler
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
-        } else if(finalState == TaskAttemptState.KILLED) {
+        } else if(finalState == TaskAttemptStateInternal.KILLED) {
           taskAttempt.eventHandler
           .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
         }
@@ -1405,7 +1448,7 @@ public abstract class TaskAttemptImpl im
           JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
           slotMillis);
       taskAttempt.eventHandler.handle(jce);
-      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
+      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_SUCCEEDED));
@@ -1428,10 +1471,10 @@ public abstract class TaskAttemptImpl im
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptState.FAILED);
+                TaskAttemptStateInternal.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-        // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
+        // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
         // handling failed map/reduce events.
       }else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
@@ -1443,7 +1486,7 @@ public abstract class TaskAttemptImpl im
   }
 
   @SuppressWarnings({ "unchecked" })
-  private void logAttemptFinishedEvent(TaskAttemptState state) {
+  private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.
     if (getLaunchTime() == 0) return; 
     if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
@@ -1500,7 +1543,7 @@ public abstract class TaskAttemptImpl im
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptState.FAILED);
+                TaskAttemptStateInternal.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
@@ -1513,11 +1556,11 @@ public abstract class TaskAttemptImpl im
   }
   
   private static class KilledAfterSuccessTransition implements
-      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptState> {
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public TaskAttemptState transition(TaskAttemptImpl taskAttempt, 
+    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
       if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
         // after a reduce task has succeeded, its outputs are in safe in HDFS.
@@ -1530,7 +1573,7 @@ public abstract class TaskAttemptImpl im
         // ignore this for reduce tasks
         LOG.info("Ignoring killed event for successful reduce task attempt" +
                   taskAttempt.getID().toString());
-        return TaskAttemptState.SUCCEEDED;
+        return TaskAttemptStateInternal.SUCCEEDED;
       }
       if(event instanceof TaskAttemptKillEvent) {
         TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
@@ -1545,12 +1588,12 @@ public abstract class TaskAttemptImpl im
       taskAttempt.eventHandler
           .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
       TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, TaskAttemptState.KILLED);
+          taskAttempt, TaskAttemptStateInternal.KILLED);
       taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
           .getTaskId().getJobId(), tauce));
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
-      return TaskAttemptState.KILLED;
+      return TaskAttemptStateInternal.KILLED;
     }
   }
 
@@ -1568,14 +1611,14 @@ public abstract class TaskAttemptImpl im
             .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptState.KILLED);
+                TaskAttemptStateInternal.KILLED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
         		"generated for taskAttempt: " + taskAttempt.getID());
       }
-//      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure.
+//      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));