You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/04/14 01:06:06 UTC

svn commit: r1467713 [1/3] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-a...

Author: szetszwo
Date: Sat Apr 13 23:05:54 2013
New Revision: 1467713

URL: http://svn.apache.org/r1467713
Log:
Merging r1466653 through r1467712 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
      - copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java
      - copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java
      - copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/
      - copied from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/TestJobInfo.java
      - copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/TestJobInfo.java
Removed:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/JobInfo.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/ExampleDriver.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project:r1430995-1467533
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1466653-1467712

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Sat Apr 13 23:05:54 2013
@@ -14,10 +14,11 @@ Trunk (Unreleased)
     MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions
     with poor implementations of Object#hashCode().  (Radim Kolar via cutting)
 
-  IMPROVEMENTS
+    HADOOP-8562. Enhancements to support Hadoop on Windows Server and Windows
+    Azure environments. (See breakdown of tasks below for subtasks and
+    contributors)
 
-    MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
-                    faster job submission. (amarrk)
+  IMPROVEMENTS
 
     MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
 
@@ -30,9 +31,6 @@ Trunk (Unreleased)
     MAPREDUCE-2733. [Gridmix] Gridmix3 cpu emulation system tests.
                     (Vinay Thota via amarrk)
 
-    MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running
-                    tasks in Gridmix. (amarrk)
-
     MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent
     fair scheduler pools. (Ahmed Radwan via todd)
 
@@ -71,39 +69,14 @@ Trunk (Unreleased)
     MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive.
     (Brandon Li via suresh)
 
+    MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.
+    (Srikanth Sundarrajan via amareshwari)
+
   BUG FIXES
 
     MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
     (Yu Gao via llu)
 
-    MAPREDUCE-4356. [Rumen] Provide access to the method
-                    ParsedTask.obtainTaskAttempts(). (ravigummadi)
-
-    MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for 
-                    map only jobs. (amarrk)
-
-    MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter
-                    strings. (ravigummadi)
-
-    MAPREDUCE-4083. [Gridmix] NPE in cpu emulation. (amarrk)
-
-    MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can
-                    become slow in some cases (ravigummadi).
-
-    MAPREDUCE-3953. [Gridmix] Gridmix throws NPE and does not simulate a
-                    job if the trace contains null taskStatus for a task.
-                    (ravigummadi)
-
-    MAPREDUCE-3829. [Gridmix] Gridmix should give better error message when
-                    input data directory already exists and -generate opton is
-                    given.(ravigummadi)
-
-    MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead
-                    counter is wrong when compressed input is used.(ravigummadi)
-
-    MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
-                    sortFinished times when needed.(ravigummadi)
-
     MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
                      (Jason Lowe via bobby)
 
@@ -155,33 +128,34 @@ Trunk (Unreleased)
     MAPREDUCE-5012. Typo in javadoc for IdentityMapper class. (Adam Monsen
     via suresh)
 
-    MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994.
-    (Sandy Ryza via tomwhite)
+    MAPREDUCE-5078. TestMRAppMaster fails on Windows due to mismatched path
+    separators. (Chris Nauroth via sseth)
 
-Release 2.0.5-alpha - UNRELEASED
+    MAPREDUCE-4885. Streaming tests have multiple failures on Windows. (Chris
+    Nauroth via bikas)
 
-  INCOMPATIBLE CHANGES
+  BREAKDOWN OF HADOOP-8562 SUBTASKS
 
-  NEW FEATURES
+    MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
+    (Chris Nauroth via suresh)
 
-  IMPROVEMENTS
-
-    MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
-    acmurthy)
+    MAPREDUCE-4780. MapReduce distribution build fails on Windows.
+    (Chris Nauroth via suresh)
 
-  OPTIMIZATIONS
+    MAPREDUCE-4790. MapReduce build script would be more readable using abspath.
+    (Chris Nauroth via suresh)
 
-  BUG FIXES
+    MAPREDUCE-4869. Fix TestMapReduceChildJVM. (Chris Nauroth via acmurthy)
 
-    MAPREDUCE-5113. Streaming input/output types are ignored with java 
-    mapper/reducer. (sandyr via tucu)
+    MAPREDUCE-4870. Fix TestMRJobsWithHistoryService. (Chris Nauroth via acmurthy)
 
-    MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu)
+    MAPREDUCE-4983. Fixed various platform specific assumptions in various tests,
+    so that they can pass on Windows too. (Chris Nauroth via vinodkv)
 
-    MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
-    command from the RM. (Jian He via jlowe)
+    HADOOP-9372. Fix bad timeout annotations on tests.
+    (Arpit Agarwal via suresh)
 
-Release 2.0.4-beta - UNRELEASED
+Release 2.0.5-beta - UNRELEASED
 
   INCOMPATIBLE CHANGES
 
@@ -189,9 +163,41 @@ Release 2.0.4-beta - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running
+    tasks in Gridmix. (amarrk via tgraves)
+
     MAPREDUCE-5033. mapred shell script should respect usage flags
     (--help -help -h). (Andrew Wang via atm)
 
+    MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits'
+    allocation on small clusters. (Bikas Saha via vinodkv)
+    
+    MAPREDUCE-4990. Construct debug strings conditionally in
+    ShuffleHandler.Shuffle#sendMapOutput(). (kkambatl via tucu)
+    
+    MAPREDUCE-4875. coverage fixing for org.apache.hadoop.mapred
+    (Aleksey Gorshkov via bobby)
+
+    MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
+    acmurthy)
+
+    MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
+    faster job submission. (amarrk via tgraves)
+
+    MAPREDUCE-5079. Changes job recovery to restore state directly from job
+    history, instaed of simulating state machine events.
+    (Jason Lowe and Robert Parker via sseth)
+
+    MAPREDUCE-4981. Add WordMean, WordMedian, WordStandardDeviation
+    to ExamplesDriver. (Plamen Jeliazkov via shv)
+
+    MAPREUDUCE-5059. Change average merge time on Job overview page to be the
+    time delta between the end of the shuffle and the start of the reduce.
+    (Omkar Vinit Joshi via vinodkv)
+
+    MAPREDUCE-4985. Add compression option to TestDFSIO usage.
+    (Plamen Jeliazkov via shv)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -218,15 +224,108 @@ Release 2.0.4-beta - UNRELEASED
     MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
     (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-4693. History server should include counters for failed tasks.
+    (Xuan Gong via sseth)
+
+    MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does
+    not exist. (sandyr via tucu)
+
+    MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is
+    appropriately used and that on-disk segments are correctly sorted on
+    file-size. (Anty Rao and Ravi Prakash via acmurthy)
+    
+    MAPREDUCE-4571. TestHsWebServicesJobs fails on jdk7. (tgraves via tucu)
+
+    MAPREDUCE-4716. TestHsWebServicesJobsQuery.testJobsQueryStateInvalid
+    fails with jdk7. (tgraves via tucu)
+
+    MAPREDUCE-5075. DistCp leaks input file handles since ThrottledInputStream
+    does not close the wrapped InputStream.  (Chris Nauroth via szetszwo)
+
+    MAPREDUCE-3872. Fix an event handling races in ContainerLauncherImpl.
+    (Robert Kanter via sseth)
+
+    MAPREDUCE-5062. Fix MR AM to read max-retries from the RM. (Zhijie Shen via
+    vinodkv)
+
+    MAPREDUCE-3829. [Gridmix] Gridmix should give better error message when
+    input data directory already exists and -generate opton is
+    given.(ravigummadi via tgraves)
+
+    MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead
+    counter is wrong when compressed input is used.(ravigummadi via tgraves)
+
+    MAPREDUCE-3953. [Gridmix] Gridmix throws NPE and does not simulate a
+    job if the trace contains null taskStatus for a task.  (ravigummadi via 
+    tgraves)
+
+    MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can
+    become slow in some cases (ravigummadi via tgraves).
+
+    MAPREDUCE-5077. Remove mapreduce.util.ResourceCalculatorPlugin and related
+    code. (Karthik Kambatla via sseth)
+
+    MAPREDUCE-4083. [Gridmix] NPE in cpu emulation. (amarrk via tgraves)
+
+    MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for 
+    map only jobs. (amarrk via tgraves)
+
+    MAPREDUCE-4356. [Rumen] Provide access to the method
+    ParsedTask.obtainTaskAttempts(). (ravigummadi via tgraves)
+
+    MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter
+    strings. (ravigummadi via tgraves) 
+
+    MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
+    sortFinished times when needed. (Ravi Gummadi via tgraves)
+
+    MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. (Omkar Vinit
+    Joshi via vinodkv)
+
+    MAPREDUCE-5113. Streaming input/output types are ignored with java 
+    mapper/reducer. (sandyr via tucu)
+
+    MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu)
+
+    MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
+    command from the RM. (Jian He via jlowe)
+
+    MAPREDUCE-5137. AM web UI: clicking on Map Task results in 500 error
+    (Thomas Graves via jlowe)
+
+    MAPREDUCE-5136. TestJobImpl->testJobNoTasks fails with IBM JAVA (Amir
+    Sanjar via jlowe)
+
+    MAPREDUCE-5139. Update MR AM to use the modified startContainer API after
+    YARN-486. (Xuan Gong via vinodkv)
+
+Release 2.0.4-alpha - UNRELEASED
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+    MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994.
+    (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
+    submitting a job (Daryn Sharp via cos)
+
     MAPREDUCE-5117. Changed MRClientProtocolPBClientImpl to be closeable and thus
     fix failures in renewal of HistoryServer's delegations tokens. (Siddharth
     Seth via vinodkv)
 
-    MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
-    submitting a job (Daryn Sharp via cos)
+    MAPREDUCE-5083. MiniMRCluster should use a random component when creating an
+    actual cluster (Siddharth Seth via hitesh)
 
-    MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. (Omkar Vinit
-    Joshi via vinodkv)
+    MAPREDUCE-5094. Disabled memory monitoring by default in MiniMRYarnCluster
+    to avoid some downstream tests failing. (Siddharth Seth via vinodkv)
 
 Release 2.0.3-alpha - 2013-02-06 
 
@@ -744,6 +843,18 @@ Release 2.0.0-alpha - 05-23-2012
     MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is
     bad (Jason Lowe via bobby)
 
+Release 0.23.8 - UNRELEASED
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
 Release 0.23.7 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -758,6 +869,12 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi
     Prakash via jlowe)
 
+    MAPREDUCE-5027. Shuffle does not limit number of outstanding connections
+    (Robert Parker via jeagles)
+    
+    MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory
+    (Aleksey Gorshkov via bobby)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
@@ -777,11 +894,36 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
     the value from the Task commitAttempt member (Robert Parker via jeagles)
 
+    MAPREDUCE-4871. AM uses mapreduce.jobtracker.split.metainfo.maxsize but
+    mapred-default has mapreduce.job.split.metainfo.maxsize (Jason Lowe via
+    jeagles)
+
+    MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
+    shutdown (Jason Lowe via jeagles)
+
+    MAPREDUCE-5043. Fetch failure processing can cause AM event queue to
+    backup and eventually OOM (Jason Lowe via bobby)
+
+    MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi
+    Prakash via tgraves)
+
+    MAPREDUCE-5060. Fetch failures that time out only count against the first
+    map task (Robert Joseph Evans via jlowe)
+
+    MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered
+    (Jason Lowe via bobby)
+
+    MAPREDUCE-5053. java.lang.InternalError from decompression codec cause
+    reducer to fail (Robert Parker via jeagles)
+
     MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
 
     MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey 
     Gorshkov via tgraves)
 
+    MAPREDUCE-5137. AM web UI: clicking on Map Task results in 500 error
+    (Thomas Graves via jlowe)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt:r1430995-1467533
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1466653-1467712

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1466653-1467712
  Merged /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/conf:r1430995-1467533

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sat Apr 13 23:05:54 2013
@@ -24,9 +24,12 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
@@ -46,6 +49,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
@@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -61,6 +68,7 @@ import org.apache.hadoop.mapreduce.task.
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@@ -74,6 +82,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -84,8 +93,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
-import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
@@ -94,6 +101,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -167,7 +175,6 @@ public class MRAppMaster extends Composi
   private AppContext context;
   private Dispatcher dispatcher;
   private ClientService clientService;
-  private Recovery recoveryServ;
   private ContainerAllocator containerAllocator;
   private ContainerLauncher containerLauncher;
   private EventHandler<CommitterEvent> committerEventHandler;
@@ -180,7 +187,6 @@ public class MRAppMaster extends Composi
   private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
-  private boolean inRecovery = false;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
 
   private Job job;
@@ -193,6 +199,8 @@ public class MRAppMaster extends Composi
   private String shutDownMessage = null;
   JobStateInternal forcedState = null;
 
+  private long recoveredJobStartTime = 0;
+
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       long appSubmitTime, int maxAppAttempts) {
@@ -340,34 +348,9 @@ public class MRAppMaster extends Composi
       }
     } else {
       committer = createOutputCommitter(conf);
-      boolean recoveryEnabled = conf.getBoolean(
-          MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-      boolean recoverySupportedByCommitter = committer.isRecoverySupported();
-
-      // If a shuffle secret was not provided by the job client then this app
-      // attempt will generate one.  However that disables recovery if there
-      // are reducers as the shuffle secret would be app attempt specific.
-      boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
-          TokenCache.getShuffleSecretKey(fsTokens) != null);
-
-      if (recoveryEnabled && recoverySupportedByCommitter
-          && shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
-        LOG.info("Recovery is enabled. "
-            + "Will try to recover from previous life on best effort basis.");
-        recoveryServ = createRecoveryService(context);
-        addIfService(recoveryServ);
-        dispatcher = recoveryServ.getDispatcher();
-        clock = recoveryServ.getClock();
-        inRecovery = true;
-      } else {
-        LOG.info("Not starting RecoveryService: recoveryEnabled: "
-            + recoveryEnabled + " recoverySupportedByCommitter: "
-            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
-            + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
-            + appAttemptID.getAttemptId());
-        dispatcher = createDispatcher();
-        addIfService(dispatcher);
-      }
+
+      dispatcher = createDispatcher();
+      addIfService(dispatcher);
 
       //service to handle requests from JobClient
       clientService = createClientService(context);
@@ -595,15 +578,6 @@ public class MRAppMaster extends Composi
     return new JobFinishEventHandler();
   }
 
-  /**
-   * Create the recovery service.
-   * @return an instance of the recovery service.
-   */
-  protected Recovery createRecoveryService(AppContext appContext) {
-    return new RecoveryService(appContext.getApplicationAttemptId(),
-        appContext.getClock(), getCommitter(), isNewApiCommitter());
-  }
-
   /** Create and initialize (but don't start) a single job. 
    * @param forcedState a state to force the job into or null for normal operation. 
    * @param diagnostic a diagnostic message to include with the job.
@@ -615,7 +589,8 @@ public class MRAppMaster extends Composi
     Job newJob =
         new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
-            completedTasksFromPreviousRun, metrics, newApiCommitter,
+            completedTasksFromPreviousRun, metrics,
+            committer, newApiCommitter,
             currentUser.getUserName(), appSubmitTime, amInfos, context, 
             forcedState, diagnostic);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@@ -978,18 +953,8 @@ public class MRAppMaster extends Composi
   public void start() {
 
     amInfos = new LinkedList<AMInfo>();
-
-    // Pull completedTasks etc from recovery
-    if (inRecovery) {
-      completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
-      amInfos = recoveryServ.getAMInfos();
-    } else {
-      // Get the amInfos anyways irrespective of whether recovery is enabled or
-      // not IF this is not the first AM generation
-      if (appAttemptID.getAttemptId() != 1) {
-        amInfos.addAll(readJustAMInfos());
-      }
-    }
+    completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
+    processRecovery();
 
     // Current an AMInfo for the current AM generation.
     AMInfo amInfo =
@@ -1051,13 +1016,105 @@ public class MRAppMaster extends Composi
     startJobs();
   }
 
+  private void processRecovery() {
+    if (appAttemptID.getAttemptId() == 1) {
+      return;  // no need to recover on the first attempt
+    }
+
+    boolean recoveryEnabled = getConfig().getBoolean(
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
+    boolean recoverySupportedByCommitter =
+        committer != null && committer.isRecoverySupported();
+
+    // If a shuffle secret was not provided by the job client then this app
+    // attempt will generate one.  However that disables recovery if there
+    // are reducers as the shuffle secret would be app attempt specific.
+    int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+    boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
+        TokenCache.getShuffleSecretKey(fsTokens) != null);
+
+    if (recoveryEnabled && recoverySupportedByCommitter
+          && shuffleKeyValidForRecovery) {
+      LOG.info("Recovery is enabled. "
+          + "Will try to recover from previous life on best effort basis.");
+      try {
+        parsePreviousJobHistory();
+      } catch (IOException e) {
+        LOG.warn("Unable to parse prior job history, aborting recovery", e);
+        // try to get just the AMInfos
+        amInfos.addAll(readJustAMInfos());
+      }
+    } else {
+      LOG.info("Will not try to recover. recoveryEnabled: "
+            + recoveryEnabled + " recoverySupportedByCommitter: "
+            + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+            + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+            + appAttemptID.getAttemptId());
+      // Get the amInfos anyways whether recovery is enabled or not
+      amInfos.addAll(readJustAMInfos());
+    }
+  }
+
+  private static FSDataInputStream getPreviousJobHistoryStream(
+      Configuration conf, ApplicationAttemptId appAttemptId)
+      throws IOException {
+    Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf,
+        appAttemptId);
+    LOG.info("Previous history file is at " + historyFile);
+    return historyFile.getFileSystem(conf).open(historyFile);
+  }
+
+  private void parsePreviousJobHistory() throws IOException {
+    FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
+        appAttemptID);
+    JobHistoryParser parser = new JobHistoryParser(in);
+    JobInfo jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    if (parseException != null) {
+      LOG.info("Got an error parsing job-history file" +
+          ", ignoring incomplete events.", parseException);
+    }
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+        .getAllTasks();
+    for (TaskInfo taskInfo : taskInfos.values()) {
+      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
+            taskInfo.getAllTaskAttempts().entrySet().iterator();
+        while (taskAttemptIterator.hasNext()) {
+          Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
+          if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
+            taskAttemptIterator.remove();
+          }
+        }
+        completedTasksFromPreviousRun
+            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+        LOG.info("Read from history task "
+            + TypeConverter.toYarn(taskInfo.getTaskId()));
+      }
+    }
+    LOG.info("Read completed tasks from history "
+        + completedTasksFromPreviousRun.size());
+    recoveredJobStartTime = jobInfo.getLaunchTime();
+
+    // recover AMInfos
+    List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
+    if (jhAmInfoList != null) {
+      for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
+        AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+            jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+            jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+            jhAmInfo.getNodeManagerHttpPort());
+        amInfos.add(amInfo);
+      }
+    }
+  }
+
   private List<AMInfo> readJustAMInfos() {
     List<AMInfo> amInfos = new ArrayList<AMInfo>();
     FSDataInputStream inputStream = null;
     try {
-      inputStream =
-          RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
-            appAttemptID);
+      inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
       EventReader jobHistoryEventReader = new EventReader(inputStream);
 
       // All AMInfos are contiguous. Track when the first AMStartedEvent
@@ -1108,7 +1165,8 @@ public class MRAppMaster extends Composi
   @SuppressWarnings("unchecked")
   protected void startJobs() {
     /** create a job-start event to get this ball rolling */
-    JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
+    JobEvent startJobEvent = new JobStartEvent(job.getID(),
+        recoveredJobStartTime);
     /** send the job-start event. this triggers the job execution. */
     dispatcher.getEventHandler().handle(startJobEvent);
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Sat Apr 13 23:05:54 2013
@@ -26,6 +26,7 @@ public enum TaskAttemptEventType {
   //Producer:Task
   TA_SCHEDULE,
   TA_RESCHEDULE,
+  TA_RECOVER,
 
   //Producer:Client, Task
   TA_KILL,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java Sat Apr 13 23:05:54 2013
@@ -28,6 +28,7 @@ public enum TaskEventType {
 
   //Producer:Job
   T_SCHEDULE,
+  T_RECOVER,
 
   //Producer:Speculator
   T_ADD_SPEC_ATTEMPT,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Apr 13 23:05:54 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -92,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@@ -101,6 +103,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -159,6 +162,7 @@ public class JobImpl implements org.apac
   private final Lock writeLock;
   private final JobId jobId;
   private final String jobName;
+  private final OutputCommitter committer;
   private final boolean newApiCommitter;
   private final org.apache.hadoop.mapreduce.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
@@ -602,7 +606,7 @@ public class JobImpl implements org.apac
       JobTokenSecretManager jobTokenSecretManager,
       Credentials fsTokenCredentials, Clock clock,
       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
-      boolean newApiCommitter, String userName,
+      OutputCommitter committer, boolean newApiCommitter, String userName,
       long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
       JobStateInternal forcedState, String forcedDiagnostic) {
     this.applicationAttemptId = applicationAttemptId;
@@ -618,6 +622,7 @@ public class JobImpl implements org.apac
     this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
     this.appSubmitTime = appSubmitTime;
     this.oldJobId = TypeConverter.fromYarn(jobId);
+    this.committer = committer;
     this.newApiCommitter = newApiCommitter;
 
     this.taskAttemptListener = taskAttemptListener;
@@ -888,10 +893,16 @@ public class JobImpl implements org.apac
     }
   }
 
-  protected void scheduleTasks(Set<TaskId> taskIDs) {
+  protected void scheduleTasks(Set<TaskId> taskIDs,
+      boolean recoverTaskOutput) {
     for (TaskId taskID : taskIDs) {
-      eventHandler.handle(new TaskEvent(taskID, 
-          TaskEventType.T_SCHEDULE));
+      TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
+      if (taskInfo != null) {
+        eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
+            committer, recoverTaskOutput));
+      } else {
+        eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
+      }
     }
   }
 
@@ -1421,7 +1432,7 @@ public class JobImpl implements org.apac
                 job.conf, splits[i], 
                 job.taskAttemptListener, 
                 job.jobToken, job.fsTokens,
-                job.clock, job.completedTasksFromPreviousRun, 
+                job.clock,
                 job.applicationAttemptId.getAttemptId(),
                 job.metrics, job.appContext);
         job.addTask(task);
@@ -1439,7 +1450,6 @@ public class JobImpl implements org.apac
                 job.conf, job.numMapTasks, 
                 job.taskAttemptListener, job.jobToken,
                 job.fsTokens, job.clock,
-                job.completedTasksFromPreviousRun, 
                 job.applicationAttemptId.getAttemptId(),
                 job.metrics, job.appContext);
         job.addTask(task);
@@ -1475,8 +1485,8 @@ public class JobImpl implements org.apac
     @Override
     public void transition(JobImpl job, JobEvent event) {
       job.setupProgress = 1.0f;
-      job.scheduleTasks(job.mapTasks);  // schedule (i.e., start) the maps
-      job.scheduleTasks(job.reduceTasks);
+      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
+      job.scheduleTasks(job.reduceTasks, true);
 
       // If we have no tasks, just transition to job completed
       if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
@@ -1507,7 +1517,12 @@ public class JobImpl implements org.apac
      */
     @Override
     public void transition(JobImpl job, JobEvent event) {
-      job.startTime = job.clock.getTime();
+      JobStartEvent jse = (JobStartEvent) event;
+      if (jse.getRecoveredJobStartTime() != 0) {
+        job.startTime = jse.getRecoveredJobStartTime();
+      } else {
+        job.startTime = job.clock.getTime();
+      }
       JobInitedEvent jie =
         new JobInitedEvent(job.oldJobId,
              job.startTime,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Sat Apr 13 23:05:54 2013
@@ -18,17 +18,13 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -49,11 +45,10 @@ public class MapTaskImpl extends TaskImp
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-      MRAppMetrics metrics, AppContext appContext) {
+      int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
     super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
         conf, taskAttemptListener, jobToken, credentials, clock,
-        completedTasksFromPreviousRun, startCount, metrics, appContext);
+        appAttemptId, metrics, appContext);
     this.taskSplitMetaInfo = taskSplitMetaInfo;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Sat Apr 13 23:05:54 2013
@@ -18,16 +18,12 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -47,11 +43,10 @@ public class ReduceTaskImpl extends Task
       int numMapTasks, TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-      MRAppMetrics metrics, AppContext appContext) {
+      int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
     super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
         taskAttemptListener, jobToken, credentials, clock,
-        completedTasksFromPreviousRun, startCount, metrics, appContext);
+        appAttemptId, metrics, appContext);
     this.numMapTasks = numMapTasks;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sat Apr 13 23:05:54 2013
@@ -56,10 +56,12 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
@@ -89,6 +91,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@@ -111,6 +114,7 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -205,6 +209,11 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
      .addTransition(TaskAttemptStateInternal.NEW,
+         EnumSet.of(TaskAttemptStateInternal.FAILED,
+             TaskAttemptStateInternal.KILLED,
+             TaskAttemptStateInternal.SUCCEEDED),
+         TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+     .addTransition(TaskAttemptStateInternal.NEW,
           TaskAttemptStateInternal.NEW,
           TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
           DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -759,8 +768,8 @@ public abstract class TaskAttemptImpl im
     // The null fields are per-container and will be constructed for each
     // container separately.
     ContainerLaunchContext container = BuilderUtils
-        .newContainerLaunchContext(null, conf
-            .get(MRJobConfig.USER_NAME), null, localResources,
+        .newContainerLaunchContext(conf
+            .get(MRJobConfig.USER_NAME), localResources,
             environment, null, serviceData, taskCredentialsBuffer,
             applicationACLs);
 
@@ -769,10 +778,9 @@ public abstract class TaskAttemptImpl im
 
   static ContainerLaunchContext createContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs,
-      ContainerId containerID, Configuration conf,
-      Token<JobTokenIdentifier> jobToken, Task remoteTask,
+      Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask,
       final org.apache.hadoop.mapred.JobID oldJobId,
-      Resource assignedCapability, WrappedJvmID jvmID,
+      WrappedJvmID jvmID,
       TaskAttemptListener taskAttemptListener,
       Credentials credentials) {
 
@@ -805,7 +813,7 @@ public abstract class TaskAttemptImpl im
 
     // Construct the actual Container
     ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
-        containerID, commonContainerSpec.getUser(), assignedCapability,
+        commonContainerSpec.getUser(),
         commonContainerSpec.getLocalResources(), myEnv, commands,
         myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
         applicationACLs);
@@ -1082,6 +1090,102 @@ public abstract class TaskAttemptImpl im
     this.avataar = avataar;
   }
   
+  @SuppressWarnings("unchecked")
+  public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
+      OutputCommitter committer, boolean recoverOutput) {
+    containerID = taInfo.getContainerId();
+    containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+        + taInfo.getPort());
+    containerMgrAddress = StringInterner.weakIntern(
+        containerNodeId.toString());
+    nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+        + taInfo.getHttpPort());
+    computeRackAndLocality();
+    launchTime = taInfo.getStartTime();
+    finishTime = (taInfo.getFinishTime() != -1) ?
+        taInfo.getFinishTime() : clock.getTime();
+    shufflePort = taInfo.getShufflePort();
+    trackerName = taInfo.getHostname();
+    httpPort = taInfo.getHttpPort();
+    sendLaunchedEvents();
+
+    reportedStatus.id = attemptId;
+    reportedStatus.progress = 1.0f;
+    reportedStatus.counters = taInfo.getCounters();
+    reportedStatus.stateString = taInfo.getState();
+    reportedStatus.phase = Phase.CLEANUP;
+    reportedStatus.mapFinishTime = taInfo.getMapFinishTime();
+    reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime();
+    reportedStatus.sortFinishTime = taInfo.getSortFinishTime();
+    addDiagnosticInfo(taInfo.getError());
+
+    boolean needToClean = false;
+    String recoveredState = taInfo.getTaskStatus();
+    if (recoverOutput
+        && TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+      TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+          TypeConverter.fromYarn(attemptId));
+      try {
+        committer.recoverTask(tac);
+        LOG.info("Recovered output from task attempt " + attemptId);
+      } catch (Exception e) {
+        LOG.error("Unable to recover task attempt " + attemptId, e);
+        LOG.info("Task attempt " + attemptId + " will be recovered as KILLED");
+        recoveredState = TaskAttemptState.KILLED.toString();
+        needToClean = true;
+      }
+    }
+
+    TaskAttemptStateInternal attemptState;
+    if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+      attemptState = TaskAttemptStateInternal.SUCCEEDED;
+      reportedStatus.taskState = TaskAttemptState.SUCCEEDED;
+      eventHandler.handle(createJobCounterUpdateEventTASucceeded(this));
+      logAttemptFinishedEvent(attemptState);
+    } else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) {
+      attemptState = TaskAttemptStateInternal.FAILED;
+      reportedStatus.taskState = TaskAttemptState.FAILED;
+      eventHandler.handle(createJobCounterUpdateEventTAFailed(this, false));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(this,
+              TaskAttemptStateInternal.FAILED);
+      eventHandler.handle(
+          new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+    } else {
+      if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) {
+        if (String.valueOf(recoveredState).isEmpty()) {
+          LOG.info("TaskAttempt" + attemptId
+              + " had not completed, recovering as KILLED");
+        } else {
+          LOG.warn("TaskAttempt " + attemptId + " found in unexpected state "
+              + recoveredState + ", recovering as KILLED");
+        }
+        addDiagnosticInfo("Killed during application recovery");
+        needToClean = true;
+      }
+      attemptState = TaskAttemptStateInternal.KILLED;
+      reportedStatus.taskState = TaskAttemptState.KILLED;
+      eventHandler.handle(createJobCounterUpdateEventTAKilled(this, false));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(this,
+              TaskAttemptStateInternal.KILLED);
+      eventHandler.handle(
+          new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+    }
+
+    if (needToClean) {
+      TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+          TypeConverter.fromYarn(attemptId));
+      try {
+        committer.abortTask(tac);
+      } catch (Exception e) {
+        LOG.warn("Task cleanup failed for attempt " + attemptId, e);
+      }
+    }
+
+    return attemptState;
+  }
+
   private static TaskAttemptState getExternalState(
       TaskAttemptStateInternal smState) {
     switch (smState) {
@@ -1122,6 +1226,24 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  private void computeRackAndLocality() {
+    nodeRackName = RackResolver.resolve(
+        containerNodeId.getHost()).getNetworkLocation();
+
+    locality = Locality.OFF_SWITCH;
+    if (dataLocalHosts.size() > 0) {
+      String cHost = resolveHost(containerNodeId.getHost());
+      if (dataLocalHosts.contains(cHost)) {
+        locality = Locality.NODE_LOCAL;
+      }
+    }
+    if (locality == Locality.OFF_SWITCH) {
+      if (dataLocalRacks.contains(nodeRackName)) {
+        locality = Locality.RACK_LOCAL;
+      }
+    }
+  }
+
   private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     int slotMemoryReq =
@@ -1141,6 +1263,18 @@ public abstract class TaskAttemptImpl im
     return slotMillisIncrement;
   }
 
+  private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
+      TaskAttemptImpl taskAttempt) {
+    long slotMillis = computeSlotMillis(taskAttempt);
+    TaskId taskId = taskAttempt.attemptId.getTaskId();
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
+    jce.addCounterUpdate(
+      taskId.getTaskType() == TaskType.MAP ?
+        JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
+        slotMillis);
+    return jce;
+  }
+
   private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
       TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
@@ -1210,6 +1344,26 @@ public abstract class TaskAttemptImpl im
     return tauce;
   }
 
+  @SuppressWarnings("unchecked")
+  private void sendLaunchedEvents() {
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
+        .getJobId());
+    jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ?
+        JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
+    eventHandler.handle(jce);
+
+    LOG.info("TaskAttempt: [" + attemptId
+        + "] using containerId: [" + containerID + " on NM: ["
+        + containerMgrAddress + "]");
+    TaskAttemptStartedEvent tase =
+      new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
+          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+          launchTime, trackerName, httpPort, shufflePort, containerID,
+          locality.toString(), avataar.toString());
+    eventHandler.handle(
+        new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
+  }
+
   private WrappedProgressSplitsBlock getProgressSplitBlock() {
     readLock.lock();
     try {
@@ -1342,8 +1496,6 @@ public abstract class TaskAttemptImpl im
           taskAttempt.containerNodeId.toString());
       taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
           cEvent.getContainer().getNodeHttpAddress());
-      taskAttempt.nodeRackName = RackResolver.resolve(
-          taskAttempt.containerNodeId.getHost()).getNetworkLocation();
       taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
       taskAttempt.assignedCapability = cEvent.getContainer().getResource();
       // this is a _real_ Task (classic Hadoop mapred flavor):
@@ -1354,32 +1506,18 @@ public abstract class TaskAttemptImpl im
       taskAttempt.taskAttemptListener.registerPendingTask(
           taskAttempt.remoteTask, taskAttempt.jvmID);
 
-      taskAttempt.locality = Locality.OFF_SWITCH;
-      if (taskAttempt.dataLocalHosts.size() > 0) {
-        String cHost = taskAttempt.resolveHost(
-            taskAttempt.containerNodeId.getHost());
-        if (taskAttempt.dataLocalHosts.contains(cHost)) {
-          taskAttempt.locality = Locality.NODE_LOCAL;
-        }
-      }
-      if (taskAttempt.locality == Locality.OFF_SWITCH) {
-        if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
-          taskAttempt.locality = Locality.RACK_LOCAL;
-        }
-      }
+      taskAttempt.computeRackAndLocality();
       
       //launch the container
       //create the container object to be launched for a given Task attempt
       ContainerLaunchContext launchContext = createContainerLaunchContext(
-          cEvent.getApplicationACLs(), taskAttempt.containerID,
-          taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
-          taskAttempt.oldJobId, taskAttempt.assignedCapability,
-          taskAttempt.jvmID, taskAttempt.taskAttemptListener,
-          taskAttempt.credentials);
+          cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
+          taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
+          taskAttempt.taskAttemptListener, taskAttempt.credentials);
       taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
           taskAttempt.attemptId, taskAttempt.containerID,
           taskAttempt.containerMgrAddress, taskAttempt.containerToken,
-          launchContext, taskAttempt.remoteTask));
+          launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask));
 
       // send event to speculator that our container needs are satisfied
       taskAttempt.eventHandler.handle
@@ -1471,27 +1609,7 @@ public abstract class TaskAttemptImpl im
                                                                   // Costly?
       taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
       taskAttempt.httpPort = nodeHttpInetAddr.getPort();
-      JobCounterUpdateEvent jce =
-          new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
-              .getJobId());
-      jce.addCounterUpdate(
-          taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? 
-              JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
-              , 1);
-      taskAttempt.eventHandler.handle(jce);
-      
-      LOG.info("TaskAttempt: [" + taskAttempt.attemptId
-          + "] using containerId: [" + taskAttempt.containerID + " on NM: ["
-          + taskAttempt.containerMgrAddress + "]");
-      TaskAttemptStartedEvent tase =
-        new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-            taskAttempt.launchTime,
-            nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
-            taskAttempt.shufflePort, taskAttempt.containerID,
-            taskAttempt.locality.toString(), taskAttempt.avataar.toString());
-      taskAttempt.eventHandler.handle
-          (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
+      taskAttempt.sendLaunchedEvents();
       taskAttempt.eventHandler.handle
           (new SpeculatorEvent
               (taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
@@ -1540,14 +1658,8 @@ public abstract class TaskAttemptImpl im
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
-      long slotMillis = computeSlotMillis(taskAttempt);
-      TaskId taskId = taskAttempt.attemptId.getTaskId();
-      JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
-      jce.addCounterUpdate(
-        taskId.getTaskType() == TaskType.MAP ? 
-          JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
-          slotMillis);
-      taskAttempt.eventHandler.handle(jce);
+      taskAttempt.eventHandler.handle(
+          createJobCounterUpdateEventTASucceeded(taskAttempt));
       taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
@@ -1585,6 +1697,18 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  private static class RecoverTransition implements
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+    @Override
+    public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event;
+      return taskAttempt.recover(tare.getTaskAttemptInfo(),
+          tare.getCommitter(), tare.getRecoverOutput());
+    }
+  }
+
   @SuppressWarnings({ "unchecked" })
   private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Apr 13 23:05:54 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -152,6 +155,12 @@ public abstract class TaskImpl implement
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, 
         TaskEventType.T_KILL, new KillNewTransition())
+    .addTransition(TaskStateInternal.NEW,
+        EnumSet.of(TaskStateInternal.FAILED,
+                   TaskStateInternal.KILLED,
+                   TaskStateInternal.RUNNING,
+                   TaskStateInternal.SUCCEEDED),
+        TaskEventType.T_RECOVER, new RecoverTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
@@ -250,20 +259,16 @@ public abstract class TaskImpl implement
 
   // By default, the next TaskAttempt number is zero. Changes during recovery  
   protected int nextAttemptNumber = 0;
-  private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
-      new ArrayList<TaskAttemptInfo>();
 
-  private static final class RecoverdAttemptsComparator implements
-      Comparator<TaskAttemptInfo> {
-    @Override
-    public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
-      long diff = attempt1.getStartTime() - attempt2.getStartTime();
-      return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
-    }
-  }
-
-  private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
-      new RecoverdAttemptsComparator();
+  // For sorting task attempts by completion time
+  private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
+      new Comparator<TaskAttemptInfo>() {
+        @Override
+        public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
+          long diff = a.getFinishTime() - b.getFinishTime();
+          return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+        }
+      };
 
   @Override
   public TaskState getState() {
@@ -280,8 +285,7 @@ public abstract class TaskImpl implement
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-      MRAppMetrics metrics, AppContext appContext) {
+      int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
     this.conf = conf;
     this.clock = clock;
     this.jobFile = remoteJobConfFile;
@@ -307,41 +311,15 @@ public abstract class TaskImpl implement
     this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
                                             MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
 
-    // See if this is from a previous generation.
-    if (completedTasksFromPreviousRun != null
-        && completedTasksFromPreviousRun.containsKey(taskId)) {
-      // This task has TaskAttempts from previous generation. We have to replay
-      // them.
-      LOG.info("Task is from previous run " + taskId);
-      TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
-      Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
-          taskInfo.getAllTaskAttempts();
-      taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
-      taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
-      Collections.sort(taskAttemptsFromPreviousGeneration,
-        RECOVERED_ATTEMPTS_COMPARATOR);
-    }
-
-    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
-      // All the previous attempts are exhausted, now start with a new
-      // generation.
-
-      // All the new TaskAttemptIDs are generated based on MR
-      // ApplicationAttemptID so that attempts from previous lives don't
-      // over-step the current one. This assumes that a task won't have more
-      // than 1000 attempts in its single generation, which is very reasonable.
-      // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
-      // and requires serious medical attention.
-      nextAttemptNumber = (startCount - 1) * 1000;
-    } else {
-      // There are still some TaskAttempts from previous generation, use them
-      nextAttemptNumber =
-          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
-    }
-
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
+
+    // All the new TaskAttemptIDs are generated based on MR
+    // ApplicationAttemptID so that attempts from previous lives don't
+    // over-step the current one. This assumes that a task won't have more
+    // than 1000 attempts in its single generation, which is very reasonable.
+    nextAttemptNumber = (appAttemptId - 1) * 1000;
   }
 
   @Override
@@ -600,14 +578,28 @@ public abstract class TaskImpl implement
 
   // This is always called in the Write Lock
   private void addAndScheduleAttempt(Avataar avataar) {
-    TaskAttempt attempt = createAttempt();
-    ((TaskAttemptImpl) attempt).setAvataar(avataar);
+    TaskAttempt attempt = addAttempt(avataar);
+    inProgressAttempts.add(attempt.getID());
+    //schedule the nextAttemptNumber
+    if (failedAttempts.size() > 0) {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+          TaskAttemptEventType.TA_RESCHEDULE));
+    } else {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+  }
+
+  private TaskAttemptImpl addAttempt(Avataar avataar) {
+    TaskAttemptImpl attempt = createAttempt();
+    attempt.setAvataar(avataar);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created attempt " + attempt.getID());
     }
     switch (attempts.size()) {
       case 0:
-        attempts = Collections.singletonMap(attempt.getID(), attempt);
+        attempts = Collections.singletonMap(attempt.getID(),
+            (TaskAttempt) attempt);
         break;
         
       case 1:
@@ -623,24 +615,8 @@ public abstract class TaskImpl implement
         break;
     }
 
-    // Update nextATtemptNumber
-    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
-      ++nextAttemptNumber;
-    } else {
-      // There are still some TaskAttempts from previous generation, use them
-      nextAttemptNumber =
-          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
-    }
-
-    inProgressAttempts.add(attempt.getID());
-    //schedule the nextAttemptNumber
-    if (failedAttempts.size() > 0) {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
-        TaskAttemptEventType.TA_RESCHEDULE));
-    } else {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
-          TaskAttemptEventType.TA_SCHEDULE));
-    }
+    ++nextAttemptNumber;
+    return attempt;
   }
 
   @Override
@@ -705,6 +681,16 @@ public abstract class TaskImpl implement
     }
   }
 
+  private void sendTaskStartedEvent() {
+    TaskStartedEvent tse = new TaskStartedEvent(
+        TypeConverter.fromYarn(taskId), getLaunchTime(),
+        TypeConverter.fromYarn(taskId.getTaskType()),
+        getSplitsAsString());
+    eventHandler
+        .handle(new JobHistoryEvent(taskId.getJobId(), tse));
+    historyTaskStartGenerated = true;
+  }
+
   private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
     TaskFinishedEvent tfe =
       new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
@@ -740,6 +726,16 @@ public abstract class TaskImpl implement
     task.successfulAttempt = null;
   }
 
+  private void sendTaskSucceededEvents() {
+    eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+    LOG.info("Task succeeded with attempt " + successfulAttempt);
+    if (historyTaskStartGenerated) {
+      TaskFinishedEvent tfe = createTaskFinishedEvent(this,
+          TaskStateInternal.SUCCEEDED);
+      eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+    }
+  }
+
   /**
   * @return a String representation of the splits.
   *
@@ -751,6 +747,122 @@ public abstract class TaskImpl implement
 	  return "";
   }
 
+  /**
+   * Recover a completed task from a previous application attempt
+   * @param taskInfo recovered info about the task
+   * @param recoverTaskOutput whether to recover task outputs
+   * @return state of the task after recovery
+   */
+  private TaskStateInternal recover(TaskInfo taskInfo,
+      OutputCommitter committer, boolean recoverTaskOutput) {
+    LOG.info("Recovering task " + taskId
+        + " from prior app attempt, status was " + taskInfo.getTaskStatus());
+
+    scheduledTime = taskInfo.getStartTime();
+    sendTaskStartedEvent();
+    Collection<TaskAttemptInfo> attemptInfos =
+        taskInfo.getAllTaskAttempts().values();
+
+    if (attemptInfos.size() > 0) {
+      metrics.launchedTask(this);
+    }
+
+    // recover the attempts for this task in the order they finished
+    // so task attempt completion events are ordered properly
+    int savedNextAttemptNumber = nextAttemptNumber;
+    ArrayList<TaskAttemptInfo> taInfos =
+        new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
+    Collections.sort(taInfos, TA_INFO_COMPARATOR);
+    for (TaskAttemptInfo taInfo : taInfos) {
+      nextAttemptNumber = taInfo.getAttemptId().getId();
+      TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
+      // handle the recovery inline so attempts complete before task does
+      attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
+          committer, recoverTaskOutput));
+      finishedAttempts.add(attempt.getID());
+      TaskAttemptCompletionEventStatus taces = null;
+      TaskAttemptState attemptState = attempt.getState();
+      switch (attemptState) {
+      case FAILED:
+        taces = TaskAttemptCompletionEventStatus.FAILED;
+        break;
+      case KILLED:
+        taces = TaskAttemptCompletionEventStatus.KILLED;
+        break;
+      case SUCCEEDED:
+        taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
+        break;
+      default:
+        throw new IllegalStateException(
+            "Unexpected attempt state during recovery: " + attemptState);
+      }
+      if (attemptState == TaskAttemptState.FAILED) {
+        failedAttempts.add(attempt.getID());
+        if (failedAttempts.size() >= maxAttempts) {
+          taces = TaskAttemptCompletionEventStatus.TIPFAILED;
+        }
+      }
+
+      // don't clobber the successful attempt completion event
+      // TODO: this shouldn't be necessary after MAPREDUCE-4330
+      if (successfulAttempt == null) {
+        handleTaskAttemptCompletion(attempt.getID(), taces);
+        if (attemptState == TaskAttemptState.SUCCEEDED) {
+          successfulAttempt = attempt.getID();
+        }
+      }
+    }
+    nextAttemptNumber = savedNextAttemptNumber;
+
+    TaskStateInternal taskState = TaskStateInternal.valueOf(
+        taskInfo.getTaskStatus());
+    switch (taskState) {
+    case SUCCEEDED:
+      if (successfulAttempt != null) {
+        sendTaskSucceededEvents();
+      } else {
+        LOG.info("Missing successful attempt for task " + taskId
+            + ", recovering as RUNNING");
+        // there must have been a fetch failure and the retry wasn't complete
+        taskState = TaskStateInternal.RUNNING;
+        metrics.runningTask(this);
+        addAndScheduleAttempt(Avataar.VIRGIN);
+      }
+      break;
+    case FAILED:
+    case KILLED:
+    {
+      if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
+        metrics.endWaitingTask(this);
+      }
+      TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
+          taskInfo.getFinishTime(), taskInfo.getTaskType(),
+          taskInfo.getError(), taskInfo.getTaskStatus(),
+          taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+      eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+      eventHandler.handle(
+          new JobTaskEvent(taskId, getExternalState(taskState)));
+      break;
+    }
+    default:
+      throw new java.lang.AssertionError("Unexpected recovered task state: "
+          + taskState);
+    }
+
+    return taskState;
+  }
+
+  private static class RecoverTransition
+    implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+    @Override
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+      TaskRecoverEvent tre = (TaskRecoverEvent) event;
+      return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
+          tre.getRecoverTaskOutput());
+    }
+  }
+
   private static class InitialScheduleTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {
 
@@ -758,13 +870,7 @@ public abstract class TaskImpl implement
     public void transition(TaskImpl task, TaskEvent event) {
       task.addAndScheduleAttempt(Avataar.VIRGIN);
       task.scheduledTime = task.clock.getTime();
-      TaskStartedEvent tse = new TaskStartedEvent(
-          TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
-          TypeConverter.fromYarn(task.taskId.getTaskType()),
-          task.getSplitsAsString());
-      task.eventHandler
-          .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
-      task.historyTaskStartGenerated = true;
+      task.sendTaskStartedEvent();
     }
   }
 
@@ -818,16 +924,7 @@ public abstract class TaskImpl implement
       task.finishedAttempts.add(taskAttemptId);
       task.inProgressAttempts.remove(taskAttemptId);
       task.successfulAttempt = taskAttemptId;
-      task.eventHandler.handle(new JobTaskEvent(
-          task.taskId, TaskState.SUCCEEDED));
-      LOG.info("Task succeeded with attempt " + task.successfulAttempt);
-      // issue kill to all other attempts
-      if (task.historyTaskStartGenerated) {
-        TaskFinishedEvent tfe = createTaskFinishedEvent(task,
-            TaskStateInternal.SUCCEEDED);
-        task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
-            tfe));
-      }
+      task.sendTaskSucceededEvents();
       for (TaskAttempt attempt : task.attempts.values()) {
         if (attempt.getID() != task.successfulAttempt &&
             // This is okay because it can only talk us out of sending a

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Sat Apr 13 23:05:54 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -150,10 +151,14 @@ public class ContainerLauncherImpl exten
         ContainerLaunchContext containerLaunchContext =
           event.getContainer();
 
+        org.apache.hadoop.yarn.api.records.Container container =
+            BuilderUtils.newContainer(containerID, null, null,
+                event.getResource(), null, containerToken);
         // Now launch the actual container
         StartContainerRequest startRequest = Records
           .newRecord(StartContainerRequest.class);
         startRequest.setContainerLaunchContext(containerLaunchContext);
+        startRequest.setContainer(container);
         StartContainerResponse response = proxy.startContainer(startRequest);
 
         ByteBuffer portInfo = response

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Sat Apr 13 23:05:54 2013
@@ -23,26 +23,34 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.Resource;
 
 public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
 
   private final ContainerLaunchContext container;
   private final Task task;
+  private final Resource resource;
 
   public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
       ContainerId containerID, String containerMgrAddress,
       ContainerToken containerToken,
-      ContainerLaunchContext containerLaunchContext, Task remoteTask) {
+      ContainerLaunchContext containerLaunchContext, Resource resource,
+      Task remoteTask) {
     super(taskAttemptID, containerID, containerMgrAddress, containerToken,
         ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
     this.container = containerLaunchContext;
     this.task = remoteTask;
+    this.resource = resource;
   }
 
   public ContainerLaunchContext getContainer() {
     return this.container;
   }
 
+  public Resource getResource() {
+    return this.resource;
+  }
+
   public Task getRemoteTask() {
     return this.task;
   }