You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/04/14 01:06:06 UTC
svn commit: r1467713 [1/3] - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-a...
Author: szetszwo
Date: Sat Apr 13 23:05:54 2013
New Revision: 1467713
URL: http://svn.apache.org/r1467713
Log:
Merging r1466653 through r1467712 from trunk.
Added:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/
- copied from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/TestJobInfo.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/TestJobInfo.java
Removed:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
Modified:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/dao/JobInfo.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/ExampleDriver.java
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project:r1430995-1467533
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1466653-1467712
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Sat Apr 13 23:05:54 2013
@@ -14,10 +14,11 @@ Trunk (Unreleased)
MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions
with poor implementations of Object#hashCode(). (Radim Kolar via cutting)
- IMPROVEMENTS
+ HADOOP-8562. Enhancements to support Hadoop on Windows Server and Windows
+ Azure environments. (See breakdown of tasks below for subtasks and
+ contributors)
- MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
- faster job submission. (amarrk)
+ IMPROVEMENTS
MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
@@ -30,9 +31,6 @@ Trunk (Unreleased)
MAPREDUCE-2733. [Gridmix] Gridmix3 cpu emulation system tests.
(Vinay Thota via amarrk)
- MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running
- tasks in Gridmix. (amarrk)
-
MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent
fair scheduler pools. (Ahmed Radwan via todd)
@@ -71,39 +69,14 @@ Trunk (Unreleased)
MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive.
(Brandon Li via suresh)
+ MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.
+ (Srikanth Sundarrajan via amareshwari)
+
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
(Yu Gao via llu)
- MAPREDUCE-4356. [Rumen] Provide access to the method
- ParsedTask.obtainTaskAttempts(). (ravigummadi)
-
- MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for
- map only jobs. (amarrk)
-
- MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter
- strings. (ravigummadi)
-
- MAPREDUCE-4083. [Gridmix] NPE in cpu emulation. (amarrk)
-
- MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can
- become slow in some cases (ravigummadi).
-
- MAPREDUCE-3953. [Gridmix] Gridmix throws NPE and does not simulate a
- job if the trace contains null taskStatus for a task.
- (ravigummadi)
-
- MAPREDUCE-3829. [Gridmix] Gridmix should give better error message when
- input data directory already exists and -generate opton is
- given.(ravigummadi)
-
- MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead
- counter is wrong when compressed input is used.(ravigummadi)
-
- MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
- sortFinished times when needed.(ravigummadi)
-
MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
(Jason Lowe via bobby)
@@ -155,33 +128,34 @@ Trunk (Unreleased)
MAPREDUCE-5012. Typo in javadoc for IdentityMapper class. (Adam Monsen
via suresh)
- MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994.
- (Sandy Ryza via tomwhite)
+ MAPREDUCE-5078. TestMRAppMaster fails on Windows due to mismatched path
+ separators. (Chris Nauroth via sseth)
-Release 2.0.5-alpha - UNRELEASED
+ MAPREDUCE-4885. Streaming tests have multiple failures on Windows. (Chris
+ Nauroth via bikas)
- INCOMPATIBLE CHANGES
+ BREAKDOWN OF HADOOP-8562 SUBTASKS
- NEW FEATURES
+ MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
+ (Chris Nauroth via suresh)
- IMPROVEMENTS
-
- MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
- acmurthy)
+ MAPREDUCE-4780. MapReduce distribution build fails on Windows.
+ (Chris Nauroth via suresh)
- OPTIMIZATIONS
+ MAPREDUCE-4790. MapReduce build script would be more readable using abspath.
+ (Chris Nauroth via suresh)
- BUG FIXES
+ MAPREDUCE-4869. Fix TestMapReduceChildJVM. (Chris Nauroth via acmurthy)
- MAPREDUCE-5113. Streaming input/output types are ignored with java
- mapper/reducer. (sandyr via tucu)
+ MAPREDUCE-4870. Fix TestMRJobsWithHistoryService. (Chris Nauroth via acmurthy)
- MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu)
+ MAPREDUCE-4983. Fixed various platform specific assumptions in various tests,
+ so that they can pass on Windows too. (Chris Nauroth via vinodkv)
- MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
- command from the RM. (Jian He via jlowe)
+ HADOOP-9372. Fix bad timeout annotations on tests.
+ (Arpit Agarwal via suresh)
-Release 2.0.4-beta - UNRELEASED
+Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES
@@ -189,9 +163,41 @@ Release 2.0.4-beta - UNRELEASED
IMPROVEMENTS
+ MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running
+ tasks in Gridmix. (amarrk via tgraves)
+
MAPREDUCE-5033. mapred shell script should respect usage flags
(--help -help -h). (Andrew Wang via atm)
+ MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits'
+ allocation on small clusters. (Bikas Saha via vinodkv)
+
+ MAPREDUCE-4990. Construct debug strings conditionally in
+ ShuffleHandler.Shuffle#sendMapOutput(). (kkambatl via tucu)
+
+ MAPREDUCE-4875. coverage fixing for org.apache.hadoop.mapred
+ (Aleksey Gorshkov via bobby)
+
+ MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
+ acmurthy)
+
+ MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
+ faster job submission. (amarrk via tgraves)
+
+ MAPREDUCE-5079. Changes job recovery to restore state directly from job
+ history, instaed of simulating state machine events.
+ (Jason Lowe and Robert Parker via sseth)
+
+ MAPREDUCE-4981. Add WordMean, WordMedian, WordStandardDeviation
+ to ExamplesDriver. (Plamen Jeliazkov via shv)
+
+ MAPREUDUCE-5059. Change average merge time on Job overview page to be the
+ time delta between the end of the shuffle and the start of the reduce.
+ (Omkar Vinit Joshi via vinodkv)
+
+ MAPREDUCE-4985. Add compression option to TestDFSIO usage.
+ (Plamen Jeliazkov via shv)
+
OPTIMIZATIONS
BUG FIXES
@@ -218,15 +224,108 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
(Sandy Ryza via tomwhite)
+ MAPREDUCE-4693. History server should include counters for failed tasks.
+ (Xuan Gong via sseth)
+
+ MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does
+ not exist. (sandyr via tucu)
+
+ MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is
+ appropriately used and that on-disk segments are correctly sorted on
+ file-size. (Anty Rao and Ravi Prakash via acmurthy)
+
+ MAPREDUCE-4571. TestHsWebServicesJobs fails on jdk7. (tgraves via tucu)
+
+ MAPREDUCE-4716. TestHsWebServicesJobsQuery.testJobsQueryStateInvalid
+ fails with jdk7. (tgraves via tucu)
+
+ MAPREDUCE-5075. DistCp leaks input file handles since ThrottledInputStream
+ does not close the wrapped InputStream. (Chris Nauroth via szetszwo)
+
+ MAPREDUCE-3872. Fix an event handling races in ContainerLauncherImpl.
+ (Robert Kanter via sseth)
+
+ MAPREDUCE-5062. Fix MR AM to read max-retries from the RM. (Zhijie Shen via
+ vinodkv)
+
+ MAPREDUCE-3829. [Gridmix] Gridmix should give better error message when
+ input data directory already exists and -generate opton is
+ given.(ravigummadi via tgraves)
+
+ MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead
+ counter is wrong when compressed input is used.(ravigummadi via tgraves)
+
+ MAPREDUCE-3953. [Gridmix] Gridmix throws NPE and does not simulate a
+ job if the trace contains null taskStatus for a task. (ravigummadi via
+ tgraves)
+
+ MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can
+ become slow in some cases (ravigummadi via tgraves).
+
+ MAPREDUCE-5077. Remove mapreduce.util.ResourceCalculatorPlugin and related
+ code. (Karthik Kambatla via sseth)
+
+ MAPREDUCE-4083. [Gridmix] NPE in cpu emulation. (amarrk via tgraves)
+
+ MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for
+ map only jobs. (amarrk via tgraves)
+
+ MAPREDUCE-4356. [Rumen] Provide access to the method
+ ParsedTask.obtainTaskAttempts(). (ravigummadi via tgraves)
+
+ MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter
+ strings. (ravigummadi via tgraves)
+
+ MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
+ sortFinished times when needed. (Ravi Gummadi via tgraves)
+
+ MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. (Omkar Vinit
+ Joshi via vinodkv)
+
+ MAPREDUCE-5113. Streaming input/output types are ignored with java
+ mapper/reducer. (sandyr via tucu)
+
+ MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu)
+
+ MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
+ command from the RM. (Jian He via jlowe)
+
+ MAPREDUCE-5137. AM web UI: clicking on Map Task results in 500 error
+ (Thomas Graves via jlowe)
+
+ MAPREDUCE-5136. TestJobImpl->testJobNoTasks fails with IBM JAVA (Amir
+ Sanjar via jlowe)
+
+ MAPREDUCE-5139. Update MR AM to use the modified startContainer API after
+ YARN-486. (Xuan Gong via vinodkv)
+
+Release 2.0.4-alpha - UNRELEASED
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
+ MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994.
+ (Sandy Ryza via tomwhite)
+
+ MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
+ submitting a job (Daryn Sharp via cos)
+
MAPREDUCE-5117. Changed MRClientProtocolPBClientImpl to be closeable and thus
fix failures in renewal of HistoryServer's delegations tokens. (Siddharth
Seth via vinodkv)
- MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
- submitting a job (Daryn Sharp via cos)
+ MAPREDUCE-5083. MiniMRCluster should use a random component when creating an
+ actual cluster (Siddharth Seth via hitesh)
- MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. (Omkar Vinit
- Joshi via vinodkv)
+ MAPREDUCE-5094. Disabled memory monitoring by default in MiniMRYarnCluster
+ to avoid some downstream tests failing. (Siddharth Seth via vinodkv)
Release 2.0.3-alpha - 2013-02-06
@@ -744,6 +843,18 @@ Release 2.0.0-alpha - 05-23-2012
MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is
bad (Jason Lowe via bobby)
+Release 0.23.8 - UNRELEASED
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -758,6 +869,12 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi
Prakash via jlowe)
+ MAPREDUCE-5027. Shuffle does not limit number of outstanding connections
+ (Robert Parker via jeagles)
+
+ MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory
+ (Aleksey Gorshkov via bobby)
+
OPTIMIZATIONS
MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
@@ -777,11 +894,36 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
the value from the Task commitAttempt member (Robert Parker via jeagles)
+ MAPREDUCE-4871. AM uses mapreduce.jobtracker.split.metainfo.maxsize but
+ mapred-default has mapreduce.job.split.metainfo.maxsize (Jason Lowe via
+ jeagles)
+
+ MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
+ shutdown (Jason Lowe via jeagles)
+
+ MAPREDUCE-5043. Fetch failure processing can cause AM event queue to
+ backup and eventually OOM (Jason Lowe via bobby)
+
+ MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi
+ Prakash via tgraves)
+
+ MAPREDUCE-5060. Fetch failures that time out only count against the first
+ map task (Robert Joseph Evans via jlowe)
+
+ MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered
+ (Jason Lowe via bobby)
+
+ MAPREDUCE-5053. java.lang.InternalError from decompression codec cause
+ reducer to fail (Robert Parker via jeagles)
+
MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey
Gorshkov via tgraves)
+ MAPREDUCE-5137. AM web UI: clicking on Map Task results in 500 error
+ (Thomas Graves via jlowe)
+
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt:r1430995-1467533
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1466653-1467712
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1466653-1467712
Merged /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/conf:r1430995-1467533
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sat Apr 13 23:05:54 2013
@@ -24,9 +24,12 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
@@ -46,6 +49,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
@@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -61,6 +68,7 @@ import org.apache.hadoop.mapreduce.task.
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@@ -74,6 +82,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -84,8 +93,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
-import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
-import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
@@ -94,6 +101,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -167,7 +175,6 @@ public class MRAppMaster extends Composi
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
- private Recovery recoveryServ;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private EventHandler<CommitterEvent> committerEventHandler;
@@ -180,7 +187,6 @@ public class MRAppMaster extends Composi
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
- private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private Job job;
@@ -193,6 +199,8 @@ public class MRAppMaster extends Composi
private String shutDownMessage = null;
JobStateInternal forcedState = null;
+ private long recoveredJobStartTime = 0;
+
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, int maxAppAttempts) {
@@ -340,34 +348,9 @@ public class MRAppMaster extends Composi
}
} else {
committer = createOutputCommitter(conf);
- boolean recoveryEnabled = conf.getBoolean(
- MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
- boolean recoverySupportedByCommitter = committer.isRecoverySupported();
-
- // If a shuffle secret was not provided by the job client then this app
- // attempt will generate one. However that disables recovery if there
- // are reducers as the shuffle secret would be app attempt specific.
- boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
- TokenCache.getShuffleSecretKey(fsTokens) != null);
-
- if (recoveryEnabled && recoverySupportedByCommitter
- && shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
- LOG.info("Recovery is enabled. "
- + "Will try to recover from previous life on best effort basis.");
- recoveryServ = createRecoveryService(context);
- addIfService(recoveryServ);
- dispatcher = recoveryServ.getDispatcher();
- clock = recoveryServ.getClock();
- inRecovery = true;
- } else {
- LOG.info("Not starting RecoveryService: recoveryEnabled: "
- + recoveryEnabled + " recoverySupportedByCommitter: "
- + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
- + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
- + appAttemptID.getAttemptId());
- dispatcher = createDispatcher();
- addIfService(dispatcher);
- }
+
+ dispatcher = createDispatcher();
+ addIfService(dispatcher);
//service to handle requests from JobClient
clientService = createClientService(context);
@@ -595,15 +578,6 @@ public class MRAppMaster extends Composi
return new JobFinishEventHandler();
}
- /**
- * Create the recovery service.
- * @return an instance of the recovery service.
- */
- protected Recovery createRecoveryService(AppContext appContext) {
- return new RecoveryService(appContext.getApplicationAttemptId(),
- appContext.getClock(), getCommitter(), isNewApiCommitter());
- }
-
/** Create and initialize (but don't start) a single job.
* @param forcedState a state to force the job into or null for normal operation.
* @param diagnostic a diagnostic message to include with the job.
@@ -615,7 +589,8 @@ public class MRAppMaster extends Composi
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
- completedTasksFromPreviousRun, metrics, newApiCommitter,
+ completedTasksFromPreviousRun, metrics,
+ committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@@ -978,18 +953,8 @@ public class MRAppMaster extends Composi
public void start() {
amInfos = new LinkedList<AMInfo>();
-
- // Pull completedTasks etc from recovery
- if (inRecovery) {
- completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
- amInfos = recoveryServ.getAMInfos();
- } else {
- // Get the amInfos anyways irrespective of whether recovery is enabled or
- // not IF this is not the first AM generation
- if (appAttemptID.getAttemptId() != 1) {
- amInfos.addAll(readJustAMInfos());
- }
- }
+ completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
+ processRecovery();
// Current an AMInfo for the current AM generation.
AMInfo amInfo =
@@ -1051,13 +1016,105 @@ public class MRAppMaster extends Composi
startJobs();
}
+ private void processRecovery() {
+ if (appAttemptID.getAttemptId() == 1) {
+ return; // no need to recover on the first attempt
+ }
+
+ boolean recoveryEnabled = getConfig().getBoolean(
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
+ boolean recoverySupportedByCommitter =
+ committer != null && committer.isRecoverySupported();
+
+ // If a shuffle secret was not provided by the job client then this app
+ // attempt will generate one. However that disables recovery if there
+ // are reducers as the shuffle secret would be app attempt specific.
+ int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+ boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
+ TokenCache.getShuffleSecretKey(fsTokens) != null);
+
+ if (recoveryEnabled && recoverySupportedByCommitter
+ && shuffleKeyValidForRecovery) {
+ LOG.info("Recovery is enabled. "
+ + "Will try to recover from previous life on best effort basis.");
+ try {
+ parsePreviousJobHistory();
+ } catch (IOException e) {
+ LOG.warn("Unable to parse prior job history, aborting recovery", e);
+ // try to get just the AMInfos
+ amInfos.addAll(readJustAMInfos());
+ }
+ } else {
+ LOG.info("Will not try to recover. recoveryEnabled: "
+ + recoveryEnabled + " recoverySupportedByCommitter: "
+ + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+ + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ + appAttemptID.getAttemptId());
+ // Get the amInfos anyways whether recovery is enabled or not
+ amInfos.addAll(readJustAMInfos());
+ }
+ }
+
+ private static FSDataInputStream getPreviousJobHistoryStream(
+ Configuration conf, ApplicationAttemptId appAttemptId)
+ throws IOException {
+ Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf,
+ appAttemptId);
+ LOG.info("Previous history file is at " + historyFile);
+ return historyFile.getFileSystem(conf).open(historyFile);
+ }
+
+ private void parsePreviousJobHistory() throws IOException {
+ FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
+ appAttemptID);
+ JobHistoryParser parser = new JobHistoryParser(in);
+ JobInfo jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ if (parseException != null) {
+ LOG.info("Got an error parsing job-history file" +
+ ", ignoring incomplete events.", parseException);
+ }
+ Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+ .getAllTasks();
+ for (TaskInfo taskInfo : taskInfos.values()) {
+ if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+ Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
+ taskInfo.getAllTaskAttempts().entrySet().iterator();
+ while (taskAttemptIterator.hasNext()) {
+ Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
+ if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
+ taskAttemptIterator.remove();
+ }
+ }
+ completedTasksFromPreviousRun
+ .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+ LOG.info("Read from history task "
+ + TypeConverter.toYarn(taskInfo.getTaskId()));
+ }
+ }
+ LOG.info("Read completed tasks from history "
+ + completedTasksFromPreviousRun.size());
+ recoveredJobStartTime = jobInfo.getLaunchTime();
+
+ // recover AMInfos
+ List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
+ if (jhAmInfoList != null) {
+ for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
+ AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+ jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+ jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+ jhAmInfo.getNodeManagerHttpPort());
+ amInfos.add(amInfo);
+ }
+ }
+ }
+
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
- inputStream =
- RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
- appAttemptID);
+ inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
@@ -1108,7 +1165,8 @@ public class MRAppMaster extends Composi
@SuppressWarnings("unchecked")
protected void startJobs() {
/** create a job-start event to get this ball rolling */
- JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
+ JobEvent startJobEvent = new JobStartEvent(job.getID(),
+ recoveredJobStartTime);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startJobEvent);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Sat Apr 13 23:05:54 2013
@@ -26,6 +26,7 @@ public enum TaskAttemptEventType {
//Producer:Task
TA_SCHEDULE,
TA_RESCHEDULE,
+ TA_RECOVER,
//Producer:Client, Task
TA_KILL,
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java Sat Apr 13 23:05:54 2013
@@ -28,6 +28,7 @@ public enum TaskEventType {
//Producer:Job
T_SCHEDULE,
+ T_RECOVER,
//Producer:Speculator
T_ADD_SPEC_ATTEMPT,
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Apr 13 23:05:54 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -92,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@@ -101,6 +103,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -159,6 +162,7 @@ public class JobImpl implements org.apac
private final Lock writeLock;
private final JobId jobId;
private final String jobName;
+ private final OutputCommitter committer;
private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
@@ -602,7 +606,7 @@ public class JobImpl implements org.apac
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
- boolean newApiCommitter, String userName,
+ OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
JobStateInternal forcedState, String forcedDiagnostic) {
this.applicationAttemptId = applicationAttemptId;
@@ -618,6 +622,7 @@ public class JobImpl implements org.apac
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
this.appSubmitTime = appSubmitTime;
this.oldJobId = TypeConverter.fromYarn(jobId);
+ this.committer = committer;
this.newApiCommitter = newApiCommitter;
this.taskAttemptListener = taskAttemptListener;
@@ -888,10 +893,16 @@ public class JobImpl implements org.apac
}
}
- protected void scheduleTasks(Set<TaskId> taskIDs) {
+ protected void scheduleTasks(Set<TaskId> taskIDs,
+ boolean recoverTaskOutput) {
for (TaskId taskID : taskIDs) {
- eventHandler.handle(new TaskEvent(taskID,
- TaskEventType.T_SCHEDULE));
+ TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
+ if (taskInfo != null) {
+ eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
+ committer, recoverTaskOutput));
+ } else {
+ eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
+ }
}
}
@@ -1421,7 +1432,7 @@ public class JobImpl implements org.apac
job.conf, splits[i],
job.taskAttemptListener,
job.jobToken, job.fsTokens,
- job.clock, job.completedTasksFromPreviousRun,
+ job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
@@ -1439,7 +1450,6 @@ public class JobImpl implements org.apac
job.conf, job.numMapTasks,
job.taskAttemptListener, job.jobToken,
job.fsTokens, job.clock,
- job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
@@ -1475,8 +1485,8 @@ public class JobImpl implements org.apac
@Override
public void transition(JobImpl job, JobEvent event) {
job.setupProgress = 1.0f;
- job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
- job.scheduleTasks(job.reduceTasks);
+ job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
+ job.scheduleTasks(job.reduceTasks, true);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
@@ -1507,7 +1517,12 @@ public class JobImpl implements org.apac
*/
@Override
public void transition(JobImpl job, JobEvent event) {
- job.startTime = job.clock.getTime();
+ JobStartEvent jse = (JobStartEvent) event;
+ if (jse.getRecoveredJobStartTime() != 0) {
+ job.startTime = jse.getRecoveredJobStartTime();
+ } else {
+ job.startTime = job.clock.getTime();
+ }
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Sat Apr 13 23:05:54 2013
@@ -18,17 +18,13 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
-import java.util.Map;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -49,11 +45,10 @@ public class MapTaskImpl extends TaskImp
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, jobToken, credentials, clock,
- completedTasksFromPreviousRun, startCount, metrics, appContext);
+ appAttemptId, metrics, appContext);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Sat Apr 13 23:05:54 2013
@@ -18,16 +18,12 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
-import java.util.Map;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -47,11 +43,10 @@ public class ReduceTaskImpl extends Task
int numMapTasks, TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, jobToken, credentials, clock,
- completedTasksFromPreviousRun, startCount, metrics, appContext);
+ appAttemptId, metrics, appContext);
this.numMapTasks = numMapTasks;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sat Apr 13 23:05:54 2013
@@ -56,10 +56,12 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
@@ -89,6 +91,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@@ -111,6 +114,7 @@ import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -205,6 +209,11 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
.addTransition(TaskAttemptStateInternal.NEW,
+ EnumSet.of(TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+ .addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.NEW,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -759,8 +768,8 @@ public abstract class TaskAttemptImpl im
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container = BuilderUtils
- .newContainerLaunchContext(null, conf
- .get(MRJobConfig.USER_NAME), null, localResources,
+ .newContainerLaunchContext(conf
+ .get(MRJobConfig.USER_NAME), localResources,
environment, null, serviceData, taskCredentialsBuffer,
applicationACLs);
@@ -769,10 +778,9 @@ public abstract class TaskAttemptImpl im
static ContainerLaunchContext createContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
- ContainerId containerID, Configuration conf,
- Token<JobTokenIdentifier> jobToken, Task remoteTask,
+ Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask,
final org.apache.hadoop.mapred.JobID oldJobId,
- Resource assignedCapability, WrappedJvmID jvmID,
+ WrappedJvmID jvmID,
TaskAttemptListener taskAttemptListener,
Credentials credentials) {
@@ -805,7 +813,7 @@ public abstract class TaskAttemptImpl im
// Construct the actual Container
ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
- containerID, commonContainerSpec.getUser(), assignedCapability,
+ commonContainerSpec.getUser(),
commonContainerSpec.getLocalResources(), myEnv, commands,
myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
applicationACLs);
@@ -1082,6 +1090,102 @@ public abstract class TaskAttemptImpl im
this.avataar = avataar;
}
+ @SuppressWarnings("unchecked")
+ public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
+ OutputCommitter committer, boolean recoverOutput) {
+ containerID = taInfo.getContainerId();
+ containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+ + taInfo.getPort());
+ containerMgrAddress = StringInterner.weakIntern(
+ containerNodeId.toString());
+ nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+ + taInfo.getHttpPort());
+ computeRackAndLocality();
+ launchTime = taInfo.getStartTime();
+ finishTime = (taInfo.getFinishTime() != -1) ?
+ taInfo.getFinishTime() : clock.getTime();
+ shufflePort = taInfo.getShufflePort();
+ trackerName = taInfo.getHostname();
+ httpPort = taInfo.getHttpPort();
+ sendLaunchedEvents();
+
+ reportedStatus.id = attemptId;
+ reportedStatus.progress = 1.0f;
+ reportedStatus.counters = taInfo.getCounters();
+ reportedStatus.stateString = taInfo.getState();
+ reportedStatus.phase = Phase.CLEANUP;
+ reportedStatus.mapFinishTime = taInfo.getMapFinishTime();
+ reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime();
+ reportedStatus.sortFinishTime = taInfo.getSortFinishTime();
+ addDiagnosticInfo(taInfo.getError());
+
+ boolean needToClean = false;
+ String recoveredState = taInfo.getTaskStatus();
+ if (recoverOutput
+ && TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+ TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptId));
+ try {
+ committer.recoverTask(tac);
+ LOG.info("Recovered output from task attempt " + attemptId);
+ } catch (Exception e) {
+ LOG.error("Unable to recover task attempt " + attemptId, e);
+ LOG.info("Task attempt " + attemptId + " will be recovered as KILLED");
+ recoveredState = TaskAttemptState.KILLED.toString();
+ needToClean = true;
+ }
+ }
+
+ TaskAttemptStateInternal attemptState;
+ if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
+ attemptState = TaskAttemptStateInternal.SUCCEEDED;
+ reportedStatus.taskState = TaskAttemptState.SUCCEEDED;
+ eventHandler.handle(createJobCounterUpdateEventTASucceeded(this));
+ logAttemptFinishedEvent(attemptState);
+ } else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) {
+ attemptState = TaskAttemptStateInternal.FAILED;
+ reportedStatus.taskState = TaskAttemptState.FAILED;
+ eventHandler.handle(createJobCounterUpdateEventTAFailed(this, false));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(this,
+ TaskAttemptStateInternal.FAILED);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+ } else {
+ if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) {
+ if (String.valueOf(recoveredState).isEmpty()) {
+ LOG.info("TaskAttempt" + attemptId
+ + " had not completed, recovering as KILLED");
+ } else {
+ LOG.warn("TaskAttempt " + attemptId + " found in unexpected state "
+ + recoveredState + ", recovering as KILLED");
+ }
+ addDiagnosticInfo("Killed during application recovery");
+ needToClean = true;
+ }
+ attemptState = TaskAttemptStateInternal.KILLED;
+ reportedStatus.taskState = TaskAttemptState.KILLED;
+ eventHandler.handle(createJobCounterUpdateEventTAKilled(this, false));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(this,
+ TaskAttemptStateInternal.KILLED);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
+ }
+
+ if (needToClean) {
+ TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptId));
+ try {
+ committer.abortTask(tac);
+ } catch (Exception e) {
+ LOG.warn("Task cleanup failed for attempt " + attemptId, e);
+ }
+ }
+
+ return attemptState;
+ }
+
private static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
@@ -1122,6 +1226,24 @@ public abstract class TaskAttemptImpl im
}
}
+ private void computeRackAndLocality() {
+ nodeRackName = RackResolver.resolve(
+ containerNodeId.getHost()).getNetworkLocation();
+
+ locality = Locality.OFF_SWITCH;
+ if (dataLocalHosts.size() > 0) {
+ String cHost = resolveHost(containerNodeId.getHost());
+ if (dataLocalHosts.contains(cHost)) {
+ locality = Locality.NODE_LOCAL;
+ }
+ }
+ if (locality == Locality.OFF_SWITCH) {
+ if (dataLocalRacks.contains(nodeRackName)) {
+ locality = Locality.RACK_LOCAL;
+ }
+ }
+ }
+
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq =
@@ -1141,6 +1263,18 @@ public abstract class TaskAttemptImpl im
return slotMillisIncrement;
}
+ private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
+ TaskAttemptImpl taskAttempt) {
+ long slotMillis = computeSlotMillis(taskAttempt);
+ TaskId taskId = taskAttempt.attemptId.getTaskId();
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
+ jce.addCounterUpdate(
+ taskId.getTaskType() == TaskType.MAP ?
+ JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
+ slotMillis);
+ return jce;
+ }
+
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
@@ -1210,6 +1344,26 @@ public abstract class TaskAttemptImpl im
return tauce;
}
+ @SuppressWarnings("unchecked")
+ private void sendLaunchedEvents() {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
+ .getJobId());
+ jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ?
+ JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
+ eventHandler.handle(jce);
+
+ LOG.info("TaskAttempt: [" + attemptId
+ + "] using containerId: [" + containerID + " on NM: ["
+ + containerMgrAddress + "]");
+ TaskAttemptStartedEvent tase =
+ new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ launchTime, trackerName, httpPort, shufflePort, containerID,
+ locality.toString(), avataar.toString());
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
+ }
+
private WrappedProgressSplitsBlock getProgressSplitBlock() {
readLock.lock();
try {
@@ -1342,8 +1496,6 @@ public abstract class TaskAttemptImpl im
taskAttempt.containerNodeId.toString());
taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
cEvent.getContainer().getNodeHttpAddress());
- taskAttempt.nodeRackName = RackResolver.resolve(
- taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
// this is a _real_ Task (classic Hadoop mapred flavor):
@@ -1354,32 +1506,18 @@ public abstract class TaskAttemptImpl im
taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID);
- taskAttempt.locality = Locality.OFF_SWITCH;
- if (taskAttempt.dataLocalHosts.size() > 0) {
- String cHost = taskAttempt.resolveHost(
- taskAttempt.containerNodeId.getHost());
- if (taskAttempt.dataLocalHosts.contains(cHost)) {
- taskAttempt.locality = Locality.NODE_LOCAL;
- }
- }
- if (taskAttempt.locality == Locality.OFF_SWITCH) {
- if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
- taskAttempt.locality = Locality.RACK_LOCAL;
- }
- }
+ taskAttempt.computeRackAndLocality();
//launch the container
//create the container object to be launched for a given Task attempt
ContainerLaunchContext launchContext = createContainerLaunchContext(
- cEvent.getApplicationACLs(), taskAttempt.containerID,
- taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
- taskAttempt.oldJobId, taskAttempt.assignedCapability,
- taskAttempt.jvmID, taskAttempt.taskAttemptListener,
- taskAttempt.credentials);
+ cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
+ taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
+ taskAttempt.taskAttemptListener, taskAttempt.credentials);
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
taskAttempt.attemptId, taskAttempt.containerID,
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
- launchContext, taskAttempt.remoteTask));
+ launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask));
// send event to speculator that our container needs are satisfied
taskAttempt.eventHandler.handle
@@ -1471,27 +1609,7 @@ public abstract class TaskAttemptImpl im
// Costly?
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
- JobCounterUpdateEvent jce =
- new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
- .getJobId());
- jce.addCounterUpdate(
- taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
- JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
- , 1);
- taskAttempt.eventHandler.handle(jce);
-
- LOG.info("TaskAttempt: [" + taskAttempt.attemptId
- + "] using containerId: [" + taskAttempt.containerID + " on NM: ["
- + taskAttempt.containerMgrAddress + "]");
- TaskAttemptStartedEvent tase =
- new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
- taskAttempt.launchTime,
- nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
- taskAttempt.shufflePort, taskAttempt.containerID,
- taskAttempt.locality.toString(), taskAttempt.avataar.toString());
- taskAttempt.eventHandler.handle
- (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
+ taskAttempt.sendLaunchedEvents();
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
@@ -1540,14 +1658,8 @@ public abstract class TaskAttemptImpl im
TaskAttemptEvent event) {
//set the finish time
taskAttempt.setFinishTime();
- long slotMillis = computeSlotMillis(taskAttempt);
- TaskId taskId = taskAttempt.attemptId.getTaskId();
- JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
- jce.addCounterUpdate(
- taskId.getTaskType() == TaskType.MAP ?
- JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
- slotMillis);
- taskAttempt.eventHandler.handle(jce);
+ taskAttempt.eventHandler.handle(
+ createJobCounterUpdateEventTASucceeded(taskAttempt));
taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
@@ -1585,6 +1697,18 @@ public abstract class TaskAttemptImpl im
}
}
+ private static class RecoverTransition implements
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+ @Override
+ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event;
+ return taskAttempt.recover(tare.getTaskAttemptInfo(),
+ tare.getCommitter(), tare.getRecoverOutput());
+ }
+ }
+
@SuppressWarnings({ "unchecked" })
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Apr 13 23:05:54 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
@@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@@ -152,6 +155,12 @@ public abstract class TaskImpl implement
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_KILL, new KillNewTransition())
+ .addTransition(TaskStateInternal.NEW,
+ EnumSet.of(TaskStateInternal.FAILED,
+ TaskStateInternal.KILLED,
+ TaskStateInternal.RUNNING,
+ TaskStateInternal.SUCCEEDED),
+ TaskEventType.T_RECOVER, new RecoverTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
@@ -250,20 +259,16 @@ public abstract class TaskImpl implement
// By default, the next TaskAttempt number is zero. Changes during recovery
protected int nextAttemptNumber = 0;
- private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
- new ArrayList<TaskAttemptInfo>();
- private static final class RecoverdAttemptsComparator implements
- Comparator<TaskAttemptInfo> {
- @Override
- public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
- long diff = attempt1.getStartTime() - attempt2.getStartTime();
- return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
- }
- }
-
- private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
- new RecoverdAttemptsComparator();
+ // For sorting task attempts by completion time
+ private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
+ new Comparator<TaskAttemptInfo>() {
+ @Override
+ public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
+ long diff = a.getFinishTime() - b.getFinishTime();
+ return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
+ }
+ };
@Override
public TaskState getState() {
@@ -280,8 +285,7 @@ public abstract class TaskImpl implement
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@@ -307,41 +311,15 @@ public abstract class TaskImpl implement
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
- // See if this is from a previous generation.
- if (completedTasksFromPreviousRun != null
- && completedTasksFromPreviousRun.containsKey(taskId)) {
- // This task has TaskAttempts from previous generation. We have to replay
- // them.
- LOG.info("Task is from previous run " + taskId);
- TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
- Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
- taskInfo.getAllTaskAttempts();
- taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
- taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
- Collections.sort(taskAttemptsFromPreviousGeneration,
- RECOVERED_ATTEMPTS_COMPARATOR);
- }
-
- if (taskAttemptsFromPreviousGeneration.isEmpty()) {
- // All the previous attempts are exhausted, now start with a new
- // generation.
-
- // All the new TaskAttemptIDs are generated based on MR
- // ApplicationAttemptID so that attempts from previous lives don't
- // over-step the current one. This assumes that a task won't have more
- // than 1000 attempts in its single generation, which is very reasonable.
- // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
- // and requires serious medical attention.
- nextAttemptNumber = (startCount - 1) * 1000;
- } else {
- // There are still some TaskAttempts from previous generation, use them
- nextAttemptNumber =
- taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
- }
-
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
+
+ // All the new TaskAttemptIDs are generated based on MR
+ // ApplicationAttemptID so that attempts from previous lives don't
+ // over-step the current one. This assumes that a task won't have more
+ // than 1000 attempts in its single generation, which is very reasonable.
+ nextAttemptNumber = (appAttemptId - 1) * 1000;
}
@Override
@@ -600,14 +578,28 @@ public abstract class TaskImpl implement
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar) {
- TaskAttempt attempt = createAttempt();
- ((TaskAttemptImpl) attempt).setAvataar(avataar);
+ TaskAttempt attempt = addAttempt(avataar);
+ inProgressAttempts.add(attempt.getID());
+ //schedule the nextAttemptNumber
+ if (failedAttempts.size() > 0) {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_RESCHEDULE));
+ } else {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_SCHEDULE));
+ }
+ }
+
+ private TaskAttemptImpl addAttempt(Avataar avataar) {
+ TaskAttemptImpl attempt = createAttempt();
+ attempt.setAvataar(avataar);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
switch (attempts.size()) {
case 0:
- attempts = Collections.singletonMap(attempt.getID(), attempt);
+ attempts = Collections.singletonMap(attempt.getID(),
+ (TaskAttempt) attempt);
break;
case 1:
@@ -623,24 +615,8 @@ public abstract class TaskImpl implement
break;
}
- // Update nextATtemptNumber
- if (taskAttemptsFromPreviousGeneration.isEmpty()) {
- ++nextAttemptNumber;
- } else {
- // There are still some TaskAttempts from previous generation, use them
- nextAttemptNumber =
- taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
- }
-
- inProgressAttempts.add(attempt.getID());
- //schedule the nextAttemptNumber
- if (failedAttempts.size() > 0) {
- eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_RESCHEDULE));
- } else {
- eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_SCHEDULE));
- }
+ ++nextAttemptNumber;
+ return attempt;
}
@Override
@@ -705,6 +681,16 @@ public abstract class TaskImpl implement
}
}
+ private void sendTaskStartedEvent() {
+ TaskStartedEvent tse = new TaskStartedEvent(
+ TypeConverter.fromYarn(taskId), getLaunchTime(),
+ TypeConverter.fromYarn(taskId.getTaskType()),
+ getSplitsAsString());
+ eventHandler
+ .handle(new JobHistoryEvent(taskId.getJobId(), tse));
+ historyTaskStartGenerated = true;
+ }
+
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
@@ -740,6 +726,16 @@ public abstract class TaskImpl implement
task.successfulAttempt = null;
}
+ private void sendTaskSucceededEvents() {
+ eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+ LOG.info("Task succeeded with attempt " + successfulAttempt);
+ if (historyTaskStartGenerated) {
+ TaskFinishedEvent tfe = createTaskFinishedEvent(this,
+ TaskStateInternal.SUCCEEDED);
+ eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+ }
+ }
+
/**
* @return a String representation of the splits.
*
@@ -751,6 +747,122 @@ public abstract class TaskImpl implement
return "";
}
+ /**
+ * Recover a completed task from a previous application attempt
+ * @param taskInfo recovered info about the task
+ * @param recoverTaskOutput whether to recover task outputs
+ * @return state of the task after recovery
+ */
+ private TaskStateInternal recover(TaskInfo taskInfo,
+ OutputCommitter committer, boolean recoverTaskOutput) {
+ LOG.info("Recovering task " + taskId
+ + " from prior app attempt, status was " + taskInfo.getTaskStatus());
+
+ scheduledTime = taskInfo.getStartTime();
+ sendTaskStartedEvent();
+ Collection<TaskAttemptInfo> attemptInfos =
+ taskInfo.getAllTaskAttempts().values();
+
+ if (attemptInfos.size() > 0) {
+ metrics.launchedTask(this);
+ }
+
+ // recover the attempts for this task in the order they finished
+ // so task attempt completion events are ordered properly
+ int savedNextAttemptNumber = nextAttemptNumber;
+ ArrayList<TaskAttemptInfo> taInfos =
+ new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
+ Collections.sort(taInfos, TA_INFO_COMPARATOR);
+ for (TaskAttemptInfo taInfo : taInfos) {
+ nextAttemptNumber = taInfo.getAttemptId().getId();
+ TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
+ // handle the recovery inline so attempts complete before task does
+ attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
+ committer, recoverTaskOutput));
+ finishedAttempts.add(attempt.getID());
+ TaskAttemptCompletionEventStatus taces = null;
+ TaskAttemptState attemptState = attempt.getState();
+ switch (attemptState) {
+ case FAILED:
+ taces = TaskAttemptCompletionEventStatus.FAILED;
+ break;
+ case KILLED:
+ taces = TaskAttemptCompletionEventStatus.KILLED;
+ break;
+ case SUCCEEDED:
+ taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unexpected attempt state during recovery: " + attemptState);
+ }
+ if (attemptState == TaskAttemptState.FAILED) {
+ failedAttempts.add(attempt.getID());
+ if (failedAttempts.size() >= maxAttempts) {
+ taces = TaskAttemptCompletionEventStatus.TIPFAILED;
+ }
+ }
+
+ // don't clobber the successful attempt completion event
+ // TODO: this shouldn't be necessary after MAPREDUCE-4330
+ if (successfulAttempt == null) {
+ handleTaskAttemptCompletion(attempt.getID(), taces);
+ if (attemptState == TaskAttemptState.SUCCEEDED) {
+ successfulAttempt = attempt.getID();
+ }
+ }
+ }
+ nextAttemptNumber = savedNextAttemptNumber;
+
+ TaskStateInternal taskState = TaskStateInternal.valueOf(
+ taskInfo.getTaskStatus());
+ switch (taskState) {
+ case SUCCEEDED:
+ if (successfulAttempt != null) {
+ sendTaskSucceededEvents();
+ } else {
+ LOG.info("Missing successful attempt for task " + taskId
+ + ", recovering as RUNNING");
+ // there must have been a fetch failure and the retry wasn't complete
+ taskState = TaskStateInternal.RUNNING;
+ metrics.runningTask(this);
+ addAndScheduleAttempt(Avataar.VIRGIN);
+ }
+ break;
+ case FAILED:
+ case KILLED:
+ {
+ if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
+ metrics.endWaitingTask(this);
+ }
+ TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
+ taskInfo.getFinishTime(), taskInfo.getTaskType(),
+ taskInfo.getError(), taskInfo.getTaskStatus(),
+ taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+ eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
+ eventHandler.handle(
+ new JobTaskEvent(taskId, getExternalState(taskState)));
+ break;
+ }
+ default:
+ throw new java.lang.AssertionError("Unexpected recovered task state: "
+ + taskState);
+ }
+
+ return taskState;
+ }
+
+ private static class RecoverTransition
+ implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+ TaskRecoverEvent tre = (TaskRecoverEvent) event;
+ return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
+ tre.getRecoverTaskOutput());
+ }
+ }
+
private static class InitialScheduleTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@@ -758,13 +870,7 @@ public abstract class TaskImpl implement
public void transition(TaskImpl task, TaskEvent event) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
task.scheduledTime = task.clock.getTime();
- TaskStartedEvent tse = new TaskStartedEvent(
- TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
- TypeConverter.fromYarn(task.taskId.getTaskType()),
- task.getSplitsAsString());
- task.eventHandler
- .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
- task.historyTaskStartGenerated = true;
+ task.sendTaskStartedEvent();
}
}
@@ -818,16 +924,7 @@ public abstract class TaskImpl implement
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
task.successfulAttempt = taskAttemptId;
- task.eventHandler.handle(new JobTaskEvent(
- task.taskId, TaskState.SUCCEEDED));
- LOG.info("Task succeeded with attempt " + task.successfulAttempt);
- // issue kill to all other attempts
- if (task.historyTaskStartGenerated) {
- TaskFinishedEvent tfe = createTaskFinishedEvent(task,
- TaskStateInternal.SUCCEEDED);
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
- tfe));
- }
+ task.sendTaskSucceededEvents();
for (TaskAttempt attempt : task.attempts.values()) {
if (attempt.getID() != task.successfulAttempt &&
// This is okay because it can only talk us out of sending a
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Sat Apr 13 23:05:54 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -150,10 +151,14 @@ public class ContainerLauncherImpl exten
ContainerLaunchContext containerLaunchContext =
event.getContainer();
+ org.apache.hadoop.yarn.api.records.Container container =
+ BuilderUtils.newContainer(containerID, null, null,
+ event.getResource(), null, containerToken);
// Now launch the actual container
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
+ startRequest.setContainer(container);
StartContainerResponse response = proxy.startContainer(startRequest);
ByteBuffer portInfo = response
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Sat Apr 13 23:05:54 2013
@@ -23,26 +23,34 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.Resource;
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
private final ContainerLaunchContext container;
private final Task task;
+ private final Resource resource;
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
ContainerId containerID, String containerMgrAddress,
ContainerToken containerToken,
- ContainerLaunchContext containerLaunchContext, Task remoteTask) {
+ ContainerLaunchContext containerLaunchContext, Resource resource,
+ Task remoteTask) {
super(taskAttemptID, containerID, containerMgrAddress, containerToken,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
this.container = containerLaunchContext;
this.task = remoteTask;
+ this.resource = resource;
}
public ContainerLaunchContext getContainer() {
return this.container;
}
+ public Resource getResource() {
+ return this.resource;
+ }
+
public Task getRemoteTask() {
return this.task;
}