You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/10/16 02:03:53 UTC

svn commit: r1398581 [3/9] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-...

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Tue Oct 16 00:02:55 2012
@@ -24,47 +24,6 @@
 <configuration>
 
 <property>
-  <name>mapreduce.jobtracker.jobhistory.location</name>
-  <value></value>
-  <description> If job tracker is static the history files are stored 
-  in this single well known place. If No value is set here, by default,
-  it is in the local file system at ${hadoop.log.dir}/history.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.jobhistory.task.numberprogresssplits</name>
-  <value>12</value>
-  <description> Every task attempt progresses from 0.0 to 1.0 [unless
-  it fails or is killed].  We record, for each task attempt, certain 
-  statistics over each twelfth of the progress range.  You can change
-  the number of intervals we divide the entire range of progress into
-  by setting this property.  Higher values give more precision to the
-  recorded data, but costs more memory in the job tracker at runtime.
-  Each increment in this attribute costs 16 bytes per running task.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.job.userhistorylocation</name>
-  <value></value>
-  <description> User can specify a location to store the history files of 
-  a particular job. If nothing is specified, the logs are stored in 
-  output directory. The files are stored in "_logs/history/" in the directory.
-  User can stop logging by giving the value "none". 
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.jobhistory.completed.location</name>
-  <value></value>
-  <description> The completed job history files are stored at this single well 
-  known location. If nothing is specified, the files are stored at 
-  ${mapreduce.jobtracker.jobhistory.location}/done.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.job.committer.setup.cleanup.needed</name>
   <value>true</value>
   <description> true, if job needs job-setup and job-cleanup.
@@ -99,15 +58,6 @@
 </property>
 
 <property>
-  <name>mapreduce.jobtracker.address</name>
-  <value>local</value>
-  <description>The host and port that the MapReduce job tracker runs
-  at.  If "local", then jobs are run in-process as a single map
-  and reduce task.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.local.clientfactory.class.name</name>
   <value>org.apache.hadoop.mapred.LocalClientFactory</value>
   <description>This the client factory that is responsible for 
@@ -115,138 +65,10 @@
 </property>
 
 <property>
-  <name>mapreduce.jobtracker.http.address</name>
-  <value>0.0.0.0:50030</value>
-  <description>
-    The job tracker http server address and port the server will listen on.
-    If the port is 0 then the server will start on a free port.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.handler.count</name>
-  <value>10</value>
-  <description>
-    The number of server threads for the JobTracker. This should be roughly
-    4% of the number of tasktracker nodes.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.report.address</name>
-  <value>127.0.0.1:0</value>
-  <description>The interface and port that task tracker server listens on. 
-  Since it is only connected to by the tasks, it uses the local interface.
-  EXPERT ONLY. Should only be changed if your host does not have the loopback 
-  interface.</description>
-</property>
-
-<property>
-  <name>mapreduce.cluster.local.dir</name>
-  <value>${hadoop.tmp.dir}/mapred/local</value>
-  <description>The local directory where MapReduce stores intermediate
-  data files.  May be a comma-separated list of
-  directories on different devices in order to spread disk i/o.
-  Directories that do not exist are ignored.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.system.dir</name>
-  <value>${hadoop.tmp.dir}/mapred/system</value>
-  <description>The directory where MapReduce stores control files.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.staging.root.dir</name>
-  <value>${hadoop.tmp.dir}/mapred/staging</value>
-  <description>The root of the staging area for users' job files
-  In practice, this should be the directory where users' home 
-  directories are located (usually /user)
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.cluster.temp.dir</name>
-  <value>${hadoop.tmp.dir}/mapred/temp</value>
-  <description>A shared directory for temporary files.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.local.dir.minspacestart</name>
-  <value>0</value>
-  <description>If the space in mapreduce.cluster.local.dir drops under this, 
-  do not ask for more tasks.
-  Value in bytes.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.local.dir.minspacekill</name>
-  <value>0</value>
-  <description>If the space in mapreduce.cluster.local.dir drops under this, 
-    do not ask more tasks until all the current ones have finished and 
-    cleaned up. Also, to save the rest of the tasks we have running, 
-    kill one of them, to clean up some space. Start with the reduce tasks,
-    then go with the ones that have finished the least.
-    Value in bytes.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.expire.trackers.interval</name>
-  <value>600000</value>
-  <description>Expert: The time-interval, in miliseconds, after which
-  a tasktracker is declared 'lost' if it doesn't send heartbeats.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.instrumentation</name>
-  <value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value>
-  <description>Expert: The instrumentation class to associate with each TaskTracker.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.resourcecalculatorplugin</name>
-  <value></value>
-  <description>
-   Name of the class whose instance will be used to query resource information
-   on the tasktracker.
-   
-   The class must be an instance of 
-   org.apache.hadoop.util.ResourceCalculatorPlugin. If the value is null, the
-   tasktracker attempts to use a class appropriate to the platform. 
-   Currently, the only platform supported is Linux.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.taskmemorymanager.monitoringinterval</name>
-  <value>5000</value>
-  <description>The interval, in milliseconds, for which the tasktracker waits
-   between two cycles of monitoring its tasks' memory usage. Used only if
-   tasks' memory management is enabled via mapred.tasktracker.tasks.maxmemory.
-   </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.tasks.sleeptimebeforesigkill</name>
-  <value>5000</value>
-  <description>The time, in milliseconds, the tasktracker waits for sending a
-  SIGKILL to a task, after it has been sent a SIGTERM. This is currently
-  not used on WINDOWS where tasks are just sent a SIGTERM.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.job.maps</name>
   <value>2</value>
   <description>The default number of map tasks per job.
-  Ignored when mapreduce.jobtracker.address is "local".  
+  Ignored when mapreduce.framework.name is "local".
   </description>
 </property>
 
@@ -256,54 +78,21 @@
   <description>The default number of reduce tasks per job. Typically set to 99%
   of the cluster's reduce capacity, so that if a node fails the reduces can 
   still be executed in a single wave.
-  Ignored when mapreduce.jobtracker.address is "local".
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.restart.recover</name>
-  <value>false</value>
-  <description>"true" to enable (job) recovery upon restart,
-               "false" to start afresh
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.jobhistory.block.size</name>
-  <value>3145728</value>
-  <description>The block size of the job history file. Since the job recovery
-               uses job history, its important to dump job history to disk as 
-               soon as possible. Note that this is an expert level parameter.
-               The default value is set to 3 MB.
+  Ignored when mapreduce.framework.name is "local".
   </description>
 </property>
 
 <property>
-  <name>mapreduce.jobtracker.taskscheduler</name>
-  <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
-  <description>The class responsible for scheduling the tasks.</description>
-</property>
-
-
-<property>
   <name>mapreduce.job.split.metainfo.maxsize</name>
   <value>10000000</value>
   <description>The maximum permissible size of the split metainfo file. 
-  The JobTracker won't attempt to read split metainfo files bigger than
-  the configured value.
+  The MapReduce ApplicationMaster won't attempt to read submitted split metainfo
+  files bigger than this configured value.
   No limits if set to -1.
   </description>
 </property>
 
 <property>
-  <name>mapreduce.jobtracker.taskscheduler.maxrunningtasks.perjob</name>
-  <value></value>
-  <description>The maximum number of running tasks for a job before
-  it gets preempted. No limits if undefined.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.map.maxattempts</name>
   <value>4</value>
   <description>Expert: The maximum number of attempts per map task.
@@ -333,7 +122,7 @@
   <name>mapreduce.reduce.shuffle.connect.timeout</name>
   <value>180000</value>
   <description>Expert: The maximum amount of time (in milli seconds) reduce
-  task spends in trying to connect to a tasktracker for getting map output.
+  task spends in trying to connect to a remote node for getting map output.
   </description>
 </property>
 
@@ -355,51 +144,6 @@
   </description>
 </property>
 
-<property>
-  <name>mapreduce.tasktracker.map.tasks.maximum</name>
-  <value>2</value>
-  <description>The maximum number of map tasks that will be run
-  simultaneously by a task tracker.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.reduce.tasks.maximum</name>
-  <value>2</value>
-  <description>The maximum number of reduce tasks that will be run
-  simultaneously by a task tracker.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.retiredjobs.cache.size</name>
-  <value>1000</value>
-  <description>The number of retired job status to keep in the cache.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.outofband.heartbeat</name>
-  <value>false</value>
-  <description>Expert: Set this to true to let the tasktracker send an 
-  out-of-band heartbeat on task-completion for better latency.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.jobhistory.lru.cache.size</name>
-  <value>5</value>
-  <description>The number of job history files loaded in memory. The jobs are 
-  loaded when they are first accessed. The cache is cleared based on LRU.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.instrumentation</name>
-  <value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>
-  <description>Expert: The instrumentation class to associate with each JobTracker.
-  </description>
-</property>
 
 <property>
   <name>mapred.child.java.opts</name>
@@ -424,7 +168,7 @@
   <description>User added environment variables for the task tracker child 
   processes. Example :
   1) A=foo  This will set the env variable A to foo
-  2) B=$B:c This is inherit tasktracker's B env variable.  
+  2) B=$B:c This is inherit nodemanager's B env variable.
   </description>
 </property>
 
@@ -568,17 +312,9 @@
   <name>mapreduce.job.speculative.slownodethreshold</name>
   <value>1.0</value>
   <description>The number of standard deviations by which a Task 
-  Tracker's ave map and reduce progress-rates (finishTime-dispatchTime)
+  Tracker's average map and reduce progress-rates (finishTime-dispatchTime)
   must be lower than the average of all successful map/reduce task's for
-  the TT to be considered too slow to give a speculative task to.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.job.jvm.numtasks</name>
-  <value>1</value>
-  <description>How many tasks to run per jvm. If set to -1, there is
-  no limit. 
+  the NodeManager to be considered too slow to give a speculative task to.
   </description>
 </property>
 
@@ -630,12 +366,6 @@
   take priority over this setting.</description>
 </property>
 
-<property>
-  <name>mapreduce.jobtracker.maxtasks.perjob</name>
-  <value>-1</value>
-  <description>The maximum number of tasks for a single job.
-  A value of -1 indicates that there is no maximum.  </description>
-</property>
 
 <property>
   <name>mapreduce.client.submit.file.replication</name>
@@ -645,41 +375,6 @@
   </description>
 </property>
 
-
-<property>
-  <name>mapreduce.tasktracker.dns.interface</name>
-  <value>default</value>
-  <description>The name of the Network Interface from which a task
-  tracker should report its IP address.
-  </description>
- </property>
- 
-<property>
-  <name>mapreduce.tasktracker.dns.nameserver</name>
-  <value>default</value>
-  <description>The host name or IP address of the name server (DNS)
-  which a TaskTracker should use to determine the host name used by
-  the JobTracker for communication and display purposes.
-  </description>
- </property>
- 
-<property>
-  <name>mapreduce.tasktracker.http.threads</name>
-  <value>40</value>
-  <description>The number of worker threads that for the http server. This is
-               used for map output fetching
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.http.address</name>
-  <value>0.0.0.0:50060</value>
-  <description>
-    The task tracker http server address and port.
-    If the port is 0 then the server will start on a free port.
-  </description>
-</property>
-
 <property>
   <name>mapreduce.task.files.preserve.failedtasks</name>
   <value>false</value>
@@ -752,54 +447,13 @@
 </property>
 
 <property>
-  <name>mapreduce.job.userlog.retain.hours</name>
-  <value>24</value>
-  <description>The maximum time, in hours, for which the user-logs are to be 
-               retained after the job completion.
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.hosts.filename</name>
-  <value></value>
-  <description>Names a file that contains the list of nodes that may
-  connect to the jobtracker.  If the value is empty, all hosts are
-  permitted.</description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.hosts.exclude.filename</name>
-  <value></value>
-  <description>Names a file that contains the list of hosts that
-  should be excluded by the jobtracker.  If the value is empty, no
-  hosts are excluded.</description>
-</property>
-
-<property>
-  <name>mapreduce.jobtracker.heartbeats.in.second</name>
-  <value>100</value>
-  <description>Expert: Approximate number of heart-beats that could arrive 
-               at JobTracker in a second. Assuming each RPC can be processed 
-               in 10msec, the default value is made 100 RPCs in a second.
-  </description>
-</property> 
-
-<property>
-  <name>mapreduce.jobtracker.tasktracker.maxblacklists</name>
-  <value>4</value>
-  <description>The number of blacklists for a taskTracker by various jobs
-               after which the task tracker could be blacklisted across
-               all jobs. The tracker will be given a tasks later
-               (after a day). The tracker will become a healthy
-               tracker after a restart.
-  </description>
-</property> 
-
-<property>
   <name>mapreduce.job.maxtaskfailures.per.tracker</name>
-  <value>4</value>
-  <description>The number of task-failures on a tasktracker of a given job 
-               after which new tasks of that job aren't assigned to it.
+  <value>3</value>
+  <description>The number of task-failures on a node manager of a given job 
+               after which new tasks of that job aren't assigned to it. It
+               MUST be less than mapreduce.map.maxattempts and
+               mapreduce.reduce.maxattempts otherwise the failed task will
+               never be tried on a different node.
   </description>
 </property>
 
@@ -817,8 +471,8 @@
     <name>mapreduce.client.completion.pollinterval</name>
     <value>5000</value>
     <description>The interval (in milliseconds) between which the JobClient
-    polls the JobTracker for updates about job status. You may want to set this
-    to a lower value to make tests run faster on a single node system. Adjusting
+    polls the MapReduce ApplicationMaster for updates about job status. You may want to
+    set this to a lower value to make tests run faster on a single node system. Adjusting
     this value in production may lead to unwanted client-server traffic.
     </description>
   </property>
@@ -833,32 +487,6 @@
     </description>
   </property>
 
-  <property>
-    <name>mapreduce.jobtracker.persist.jobstatus.active</name>
-    <value>true</value>
-    <description>Indicates if persistency of job status information is
-      active or not.
-    </description>
-  </property>
-
-  <property>
-  <name>mapreduce.jobtracker.persist.jobstatus.hours</name>
-  <value>1</value>
-  <description>The number of hours job status information is persisted in DFS.
-    The job status information will be available after it drops of the memory
-    queue and between jobtracker restarts. With a zero value the job status
-    information is not persisted at all in DFS.
-  </description>
-</property>
-
-  <property>
-    <name>mapreduce.jobtracker.persist.jobstatus.dir</name>
-    <value>/jobtracker/jobsInfo</value>
-    <description>The directory where the job status information is persisted
-      in a file system to be available after it drops of the memory queue and
-      between jobtracker restarts.
-    </description>
-  </property>
 
   <property>
     <name>mapreduce.task.profile</name>
@@ -891,8 +519,8 @@
     <description> The number of Task attempts AFTER which skip mode 
     will be kicked off. When skip mode is kicked off, the 
     tasks reports the range of records which it will process 
-    next, to the TaskTracker. So that on failures, TT knows which 
-    ones are possibly the bad records. On further executions, 
+    next, to the MR ApplicationMaster. So that on failures, the MR AM
+    knows which ones are possibly the bad records. On further executions,
     those are skipped.
     </description>
   </property>
@@ -1003,15 +631,6 @@
                 calls</description>
 </property>
   
-<!-- Proxy Configuration -->
-<property>
-  <name>mapreduce.jobtracker.taskcache.levels</name>
-  <value>2</value>
-  <description> This is the max level of the task cache. For example, if
-    the level is 2, the tasks cached are at the host level and at the rack
-    level.
-  </description>
-</property>
 
 <property>
   <name>mapreduce.job.queuename</name>
@@ -1025,18 +644,29 @@
 </property>
 
 <property>
+  <name>mapreduce.cluster.local.dir</name>
+  <value>${hadoop.tmp.dir}/mapred/local</value>
+  <description>
+      The local directory where MapReduce stores intermediate
+      data files.  May be a comma-separated list of
+      directories on different devices in order to spread disk i/o.
+      Directories that do not exist are ignored.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.cluster.acls.enabled</name>
   <value>false</value>
   <description> Specifies whether ACLs should be checked
     for authorization of users for doing various queue and job level operations.
     ACLs are disabled by default. If enabled, access control checks are made by
-    JobTracker and TaskTracker when requests are made by users for queue
+    MapReduce ApplicationMaster when requests are made by users for queue
     operations like submit job to a queue and kill a job in the queue and job
     operations like viewing the job-details (See mapreduce.job.acl-view-job)
     or for modifying the job (See mapreduce.job.acl-modify-job) using
     Map/Reduce APIs, RPCs or via the console and web user interfaces.
-    For enabling this flag(mapreduce.cluster.acls.enabled), this is to be set
-    to true in mapred-site.xml on JobTracker node and on all TaskTracker nodes.
+    For enabling this flag, set to true in mapred-site.xml file of all
+    MapReduce clients (MR job submitting nodes).
   </description>
 </property>
 
@@ -1089,8 +719,8 @@
       o job-level counters
       o task-level counters
       o tasks' diagnostic information
-      o task-logs displayed on the TaskTracker web-UI and
-      o job.xml showed by the JobTracker's web-UI
+      o task-logs displayed on the HistoryServer's web-UI and
+      o job.xml showed by the HistoryServer's web-UI
     Every other piece of information of jobs is still accessible by any other
     user, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
 
@@ -1108,18 +738,10 @@
 </property>
 
 <property>
-  <name>mapreduce.tasktracker.indexcache.mb</name>
-  <value>10</value>
-  <description> The maximum memory that a task tracker allows for the 
-    index cache that is used when serving map outputs to reducers.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.task.merge.progress.records</name>
   <value>10000</value>
   <description> The number of records to process during merge before
-   sending a progress notification to the TaskTracker.
+   sending a progress notification to the MR ApplicationMaster.
   </description>
 </property>
 
@@ -1140,22 +762,6 @@
 </property>
 
 <property>
-  <name>mapreduce.tasktracker.taskcontroller</name>
-  <value>org.apache.hadoop.mapred.DefaultTaskController</value>
-  <description>TaskController which is used to launch and manage task execution 
-  </description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.group</name>
-  <value></value>
-  <description>Expert: Group to which TaskTracker belongs. If 
-   LinuxTaskController is configured via mapreduce.tasktracker.taskcontroller,
-   the group owner of the task-controller binary should be same as this group.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.shuffle.port</name>
   <value>8080</value>
   <description>Default port that the ShuffleHandler will run on. ShuffleHandler 
@@ -1164,41 +770,7 @@
   </description>
 </property>
 
-<!--  Node health script variables -->
-
-<property>
-  <name>mapreduce.tasktracker.healthchecker.script.path</name>
-  <value></value>
-  <description>Absolute path to the script which is
-  periodicallyrun by the node health monitoring service to determine if
-  the node is healthy or not. If the value of this key is empty or the
-  file does not exist in the location configured here, the node health
-  monitoring service is not started.</description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.healthchecker.interval</name>
-  <value>60000</value>
-  <description>Frequency of the node health script to be run,
-  in milliseconds</description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.healthchecker.script.timeout</name>
-  <value>600000</value>
-  <description>Time after node health script should be killed if 
-  unresponsive and considered that the script has failed.</description>
-</property>
-
-<property>
-  <name>mapreduce.tasktracker.healthchecker.script.args</name>
-  <value></value>
-  <description>List of arguments which are to be passed to 
-  node health script when it is being launched comma seperated.
-  </description>
-</property>
-
-<!--  end of node health script variables -->
+<!-- MR YARN Application properties -->
 
 <property>
  <name>mapreduce.job.counters.limit</name>
@@ -1309,6 +881,13 @@
   <description>The amount of memory the MR AppMaster needs.</description>
 </property>
 
+<property>
+  <description>CLASSPATH for MR applications. A comma-separated list
+  of CLASSPATH entries</description>
+   <name>mapreduce.application.classpath</name>
+   <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
+</property>
+
 <!-- jobhistory properties -->
 
 <property>

Propchange: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1363593-1396941

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java Tue Oct 16 00:02:55 2012
@@ -18,14 +18,18 @@
 
 package org.apache.hadoop.mapreduce;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -50,4 +54,22 @@ public class TestJob {
     Assert.assertNotNull(job.toString());
   }
 
+  @Test
+  public void testUGICredentialsPropogation() throws Exception {
+    Credentials creds = new Credentials();
+    Token<?> token = mock(Token.class);
+    Text tokenService = new Text("service");
+    Text secretName = new Text("secret");
+    byte secret[] = new byte[]{};
+        
+    creds.addToken(tokenService,  token);
+    creds.addSecretKey(secretName, secret);
+    UserGroupInformation.getLoginUser().addCredentials(creds);
+    
+    JobConf jobConf = new JobConf();
+    Job job = new Job(jobConf);
+
+    assertSame(token, job.getCredentials().getToken(tokenService));
+    assertSame(secret, job.getCredentials().getSecretKey(secretName));
+  }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java Tue Oct 16 00:02:55 2012
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.util.Recor
 
 public class CompletedTask implements Task {
 
+  private static final Counters EMPTY_COUNTERS = new Counters();
+
   private final TaskId taskId;
   private final TaskInfo taskInfo;
   private TaskReport report;
@@ -124,7 +126,11 @@ public class CompletedTask implements Ta
     report.setFinishTime(taskInfo.getFinishTime());
     report.setTaskState(getState());
     report.setProgress(getProgress());
-    report.setCounters(TypeConverter.toYarn(getCounters()));
+    Counters counters = getCounters();
+    if (counters == null) {
+      counters = EMPTY_COUNTERS;
+    }
+    report.setCounters(TypeConverter.toYarn(counters));
     if (successfulAttempt != null) {
       report.setSuccessfulAttempt(successfulAttempt);
     }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Tue Oct 16 00:02:55 2012
@@ -23,14 +23,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -77,7 +78,7 @@ public class HistoryFileManager extends 
   private static enum HistoryInfoState {
     IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
   };
-
+  
   private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
       .doneSubdirsBeforeSerialTail();
 
@@ -130,7 +131,7 @@ public class HistoryFileManager extends 
     }
   }
 
-  private static class JobListCache {
+  static class JobListCache {
     private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
     private int maxSize;
     private long maxAge;
@@ -199,6 +200,29 @@ public class HistoryFileManager extends 
     }
   }
 
+  /**
+   * This class represents a user dir in the intermediate done directory.  This
+   * is mostly for locking purposes. 
+   */
+  private class UserLogDir {
+    long modTime = 0;
+    
+    public synchronized void scanIfNeeded(FileStatus fs) {
+      long newModTime = fs.getModificationTime();
+      if (modTime != newModTime) {
+        Path p = fs.getPath();
+        try {
+          scanIntermediateDirectory(p);
+          //If scanning fails, we will scan again.  We assume the failure is
+          // temporary.
+          modTime = newModTime;
+        } catch (IOException e) {
+          LOG.error("Error while trying to scan the directory " + p, e);
+        }
+      }
+    }
+  }
+  
   public class HistoryFileInfo {
     private Path historyFile;
     private Path confFile;
@@ -216,12 +240,14 @@ public class HistoryFileManager extends 
           : HistoryInfoState.IN_INTERMEDIATE;
     }
 
-    private synchronized boolean isMovePending() {
+    @VisibleForTesting
+    synchronized boolean isMovePending() {
       return state == HistoryInfoState.IN_INTERMEDIATE
           || state == HistoryInfoState.MOVE_FAILED;
     }
 
-    private synchronized boolean didMoveFail() {
+    @VisibleForTesting
+    synchronized boolean didMoveFail() {
       return state == HistoryInfoState.MOVE_FAILED;
     }
     
@@ -342,7 +368,7 @@ public class HistoryFileManager extends 
   }
 
   private SerialNumberIndex serialNumberIndex = null;
-  private JobListCache jobListCache = null;
+  protected JobListCache jobListCache = null;
 
   // Maintains a list of known done subdirectories.
   private final Set<Path> existingDoneSubdirs = Collections
@@ -352,13 +378,13 @@ public class HistoryFileManager extends 
    * Maintains a mapping between intermediate user directories and the last
    * known modification time.
    */
-  private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
+  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
+    new ConcurrentHashMap<String, UserLogDir>();
 
   private JobACLsManager aclsMgr;
 
   private Configuration conf;
 
-  private boolean debugMode;
   private String serialNumberFormat;
 
   private Path doneDirPrefixPath = null; // folder for completed jobs
@@ -379,8 +405,7 @@ public class HistoryFileManager extends 
   public void init(Configuration conf) {
     this.conf = conf;
 
-    debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
-    int serialNumberLowDigits = debugMode ? 1 : 3;
+    int serialNumberLowDigits = 3;
     serialNumberFormat = ("%0"
         + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
         + "d");
@@ -392,6 +417,7 @@ public class HistoryFileManager extends 
       doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
           new Path(doneDirPrefix));
       doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
+      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
       mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
           JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
     } catch (IOException e) {
@@ -585,23 +611,15 @@ public class HistoryFileManager extends 
 
     for (FileStatus userDir : userDirList) {
       String name = userDir.getPath().getName();
-      long newModificationTime = userDir.getModificationTime();
-      boolean shouldScan = false;
-      synchronized (userDirModificationTimeMap) {
-        if (!userDirModificationTimeMap.containsKey(name)
-            || newModificationTime > userDirModificationTimeMap.get(name)) {
-          shouldScan = true;
-          userDirModificationTimeMap.put(name, newModificationTime);
-        }
-      }
-      if (shouldScan) {
-        try {
-          scanIntermediateDirectory(userDir.getPath());
-        } catch (IOException e) {
-          LOG.error("Error while trying to scan the directory " 
-              + userDir.getPath(), e);
+      UserLogDir dir = userDirModificationTimeMap.get(name);
+      if(dir == null) {
+        dir = new UserLogDir();
+        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
+        if(old != null) {
+          dir = old;
         }
       }
+      dir.scanIfNeeded(userDir);
     }
   }
 
@@ -692,8 +710,8 @@ public class HistoryFileManager extends 
    * @throws IOException
    */
   private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
-    int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
-    String boxedSerialNumber = String.valueOf(jobSerialNumber);
+    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
+        jobId, serialNumberFormat);
     Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
     if (dateStringSet == null) {
       return null;
@@ -779,8 +797,8 @@ public class HistoryFileManager extends 
   }
 
   private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
-    String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(
-        millisecondTime, debugMode);
+    String timestampComponent = JobHistoryUtils
+        .timestampDirectoryComponent(millisecondTime);
     return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
         id, timestampComponent, serialNumberFormat));
   }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue Oct 16 00:02:55 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -59,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -402,6 +404,108 @@ public class TestJobHistoryParsing {
     }
   }
   
+  @Test
+  public void testCountersForFailedTask() throws Exception {
+    LOG.info("STARTING testCountersForFailedTask");
+    try {
+    Configuration conf = new Configuration();
+    conf
+        .setClass(
+            CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            MyResolver.class, DNSToSwitchMapping.class);
+    RackResolver.init(conf);
+    MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true,
+        this.getClass().getName(), true);
+    app.submit(conf);
+    Job job = app.getContext().getAllJobs().values().iterator().next();
+    JobId jobId = job.getID();
+    app.waitForState(job, JobState.FAILED);
+
+    // make sure all events are flushed
+    app.waitForState(Service.STATE.STOPPED);
+
+    String jobhistoryDir = JobHistoryUtils
+        .getHistoryIntermediateDoneDirForUser(conf);
+    JobHistory jobHistory = new JobHistory();
+    jobHistory.init(conf);
+
+    JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
+        .getJobIndexInfo();
+    String jobhistoryFileName = FileNameIndexUtils
+        .getDoneFileName(jobIndexInfo);
+
+    Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+    FSDataInputStream in = null;
+    FileContext fc = null;
+    try {
+      fc = FileContext.getFileContext(conf);
+      in = fc.open(fc.makeQualified(historyFilePath));
+    } catch (IOException ioe) {
+      LOG.info("Can not open history file: " + historyFilePath, ioe);
+      throw (new Exception("Can not open History File"));
+    }
+
+    JobHistoryParser parser = new JobHistoryParser(in);
+    JobInfo jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    Assert.assertNull("Caught an expected exception " + parseException,
+        parseException);
+    for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
+      TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
+      CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
+      Assert.assertNotNull("completed task report has null counters",
+          ct.getReport().getCounters());
+    }
+    } finally {
+      LOG.info("FINISHED testCountersForFailedTask");
+    }
+  }
+
+  @Test
+  public void testScanningOldDirs() throws Exception {
+    LOG.info("STARTING testScanningOldDirs");
+    try {
+    Configuration conf = new Configuration();
+    conf
+        .setClass(
+            CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            MyResolver.class, DNSToSwitchMapping.class);
+    RackResolver.init(conf);
+    MRApp app =
+        new MRAppWithHistory(1, 1, true,
+            this.getClass().getName(), true);
+    app.submit(conf);
+    Job job = app.getContext().getAllJobs().values().iterator().next();
+    JobId jobId = job.getID();
+    LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+    app.waitForState(job, JobState.SUCCEEDED);
+
+    // make sure all events are flushed
+    app.waitForState(Service.STATE.STOPPED);
+
+    HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
+    hfm.init(conf);
+    HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
+    Assert.assertNotNull("Unable to locate job history", fileInfo);
+
+    // force the manager to "forget" the job
+    hfm.deleteJobFromJobListCache(fileInfo);
+    final int msecPerSleep = 10;
+    int msecToSleep = 10 * 1000;
+    while (fileInfo.isMovePending() && msecToSleep > 0) {
+      Assert.assertTrue(!fileInfo.didMoveFail());
+      msecToSleep -= msecPerSleep;
+      Thread.sleep(msecPerSleep);
+    }
+    Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
+
+    fileInfo = hfm.getFileInfo(jobId);
+    Assert.assertNotNull("Unable to locate old job history", fileInfo);
+   } finally {
+      LOG.info("FINISHED testScanningOldDirs");
+    }
+  }
+
   static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
 
     public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
@@ -422,6 +526,32 @@ public class TestJobHistoryParsing {
     }
   }
 
+  static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
+
+    public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0) {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  static class HistoryFileManagerForTest extends HistoryFileManager {
+    void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
+      jobListCache.delete(fileInfo);
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestJobHistoryParsing t = new TestJobHistoryParsing();
     t.testHistoryParsing();

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java Tue Oct 16 00:02:55 2012
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
@@ -217,9 +218,23 @@ public class TestHsWebServicesJobsQuery 
   @Test
   public void testJobsQueryStateNone() throws JSONException, Exception {
     WebResource r = resource();
+
+     ArrayList<JobState> JOB_STATES = 
+         new ArrayList<JobState>(Arrays.asList(JobState.values()));
+
+      // find a state that isn't in use
+      Map<JobId, Job> jobsMap = appContext.getAllJobs();
+      for (Map.Entry<JobId, Job> entry : jobsMap.entrySet()) {
+        JOB_STATES.remove(entry.getValue().getState());
+      }
+
+    assertTrue("No unused job states", JOB_STATES.size() > 0);
+    JobState notInUse = JOB_STATES.get(0);
+
     ClientResponse response = r.path("ws").path("v1").path("history")
-        .path("mapreduce").path("jobs").queryParam("state", JobState.KILL_WAIT.toString())
+        .path("mapreduce").path("jobs").queryParam("state", notInUse.toString())
         .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+
     assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
     JSONObject json = response.getEntity(JSONObject.class);
     assertEquals("incorrect number of elements", 1, json.length());

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml Tue Oct 16 00:02:55 2012
@@ -102,7 +102,7 @@
 <property><!--Loaded from job.xml--><name>dfs.permissions.enabled</name><value>true</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.tasktracker.taskcontroller</name><value>org.apache.hadoop.mapred.DefaultTaskController</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.reduce.shuffle.parallelcopies</name><value>5</value></property>
-<property><!--Loaded from job.xml--><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,YARN_HOME</value></property>
+<property><!--Loaded from job.xml--><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,HADOOP_YARN_HOME</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.jobtracker.heartbeats.in.second</name><value>100</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.job.maxtaskfailures.per.tracker</name><value>4</value></property>
 <property><!--Loaded from job.xml--><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
@@ -317,8 +317,8 @@
         $HADOOP_COMMON_HOME/share/hadoop/common/lib/*,
         $HADOOP_HDFS_HOME/share/hadoop/hdfs/*,
         $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,
-        $YARN_HOME/share/hadoop/mapreduce/*,
-        $YARN_HOME/share/hadoop/mapreduce/lib/*
+        $HADOOP_YARN_HOME/share/hadoop/mapreduce/*,
+        $HADOOP_YARN_HOME/share/hadoop/mapreduce/lib/*
      </value></property>
 <property><!--Loaded from job.xml--><name>yarn.nodemanager.log-aggregation.compression-type</name><value>gz</value></property>
 <property><!--Loaded from job.xml--><name>dfs.image.compress</name><value>false</value></property>

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Tue Oct 16 00:02:55 2012
@@ -19,9 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,75 +38,29 @@ import org.apache.hadoop.mapreduce.v2.ut
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
 
-
-// TODO: This should be part of something like yarn-client.
-public class ResourceMgrDelegate {
+public class ResourceMgrDelegate extends YarnClientImpl {
   private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
       
-  private final InetSocketAddress rmAddress;
   private YarnConfiguration conf;
-  ClientRMProtocol applicationsManager;
+  private GetNewApplicationResponse application;
   private ApplicationId applicationId;
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   /**
    * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
    * @param conf the configuration object.
    */
   public ResourceMgrDelegate(YarnConfiguration conf) {
+    super();
     this.conf = conf;
-    YarnRPC rpc = YarnRPC.create(this.conf);
-    this.rmAddress = getRmAddress(conf);
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
-    applicationsManager =
-        (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
-            rmAddress, this.conf);
-    LOG.debug("Connected to ResourceManager at " + rmAddress);
-  }
-  
-  /**
-   * Used for injecting applicationsManager, mostly for testing.
-   * @param conf the configuration object
-   * @param applicationsManager the handle to talk the resource managers 
-   *                            {@link ClientRMProtocol}.
-   */
-  public ResourceMgrDelegate(YarnConfiguration conf, 
-      ClientRMProtocol applicationsManager) {
-    this.conf = conf;
-    this.applicationsManager = applicationsManager;
-    this.rmAddress = getRmAddress(conf);
-  }
-  
-  private static InetSocketAddress getRmAddress(YarnConfiguration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-                              YarnConfiguration.DEFAULT_RM_ADDRESS,
-                              YarnConfiguration.DEFAULT_RM_PORT);
+    init(conf);
+    start();
   }
   
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
@@ -117,26 +68,15 @@ public class ResourceMgrDelegate {
     return;
   }
 
-
   public TaskTrackerInfo[] getActiveTrackers() throws IOException,
       InterruptedException {
-    GetClusterNodesRequest request = 
-      recordFactory.newRecordInstance(GetClusterNodesRequest.class);
-    GetClusterNodesResponse response = 
-      applicationsManager.getClusterNodes(request);
-    return TypeConverter.fromYarnNodes(response.getNodeReports());
+    return TypeConverter.fromYarnNodes(super.getNodeReports());
   }
 
-
   public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-    GetAllApplicationsRequest request =
-      recordFactory.newRecordInstance(GetAllApplicationsRequest.class);
-    GetAllApplicationsResponse response = 
-      applicationsManager.getAllApplications(request);
-    return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf);
+    return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
   }
 
-
   public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
       InterruptedException {
     // TODO: Implement getBlacklistedTrackers
@@ -144,128 +84,56 @@ public class ResourceMgrDelegate {
     return new TaskTrackerInfo[0];
   }
 
-
   public ClusterMetrics getClusterMetrics() throws IOException,
       InterruptedException {
-    GetClusterMetricsRequest request = recordFactory.newRecordInstance(GetClusterMetricsRequest.class);
-    GetClusterMetricsResponse response = applicationsManager.getClusterMetrics(request);
-    YarnClusterMetrics metrics = response.getClusterMetrics();
+    YarnClusterMetrics metrics = super.getYarnClusterMetrics();
     ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
         metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
         metrics.getNumNodeManagers(), 0, 0);
     return oldMetrics;
   }
 
-
   @SuppressWarnings("rawtypes")
-  public Token getDelegationToken(Text renewer)
-      throws IOException, InterruptedException {
-    /* get the token from RM */
-    org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest 
-    rmDTRequest = recordFactory.newRecordInstance(
-        org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest.class);
-    rmDTRequest.setRenewer(renewer.toString());
-    org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse 
-      response = applicationsManager.getDelegationToken(rmDTRequest);
-    DelegationToken yarnToken = response.getRMDelegationToken();
-    return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress);
+  public Token getDelegationToken(Text renewer) throws IOException,
+      InterruptedException {
+    return ProtoUtils.convertFromProtoFormat(
+      super.getRMDelegationToken(renewer), rmAddress);
   }
 
-
   public String getFilesystemName() throws IOException, InterruptedException {
     return FileSystem.get(conf).getUri().toString();
   }
 
   public JobID getNewJobID() throws IOException, InterruptedException {
-    GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
-    applicationId = applicationsManager.getNewApplication(request).getApplicationId();
+    this.application = super.getNewApplication();
+    this.applicationId = this.application.getApplicationId();
     return TypeConverter.fromYarn(applicationId);
   }
 
-  private static final String ROOT = "root";
-
-  private GetQueueInfoRequest getQueueInfoRequest(String queueName, 
-      boolean includeApplications, boolean includeChildQueues, boolean recursive) {
-    GetQueueInfoRequest request = 
-      recordFactory.newRecordInstance(GetQueueInfoRequest.class);
-    request.setQueueName(queueName);
-    request.setIncludeApplications(includeApplications);
-    request.setIncludeChildQueues(includeChildQueues);
-    request.setRecursive(recursive);
-    return request;
-    
-  }
-  
   public QueueInfo getQueue(String queueName) throws IOException,
   InterruptedException {
-    GetQueueInfoRequest request = 
-      getQueueInfoRequest(queueName, true, false, false); 
-      recordFactory.newRecordInstance(GetQueueInfoRequest.class);
     return TypeConverter.fromYarn(
-        applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf);
+        super.getQueueInfo(queueName), this.conf);
   }
-  
-  private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent, 
-      List<org.apache.hadoop.yarn.api.records.QueueInfo> queues,
-      boolean recursive) {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> childQueues = 
-      parent.getChildQueues();
-
-    for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) {
-      queues.add(child);
-      if(recursive) {
-        getChildQueues(child, queues, recursive);
-      }
-    }
-  }
-
 
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
       InterruptedException {
-    GetQueueUserAclsInfoRequest request = 
-      recordFactory.newRecordInstance(GetQueueUserAclsInfoRequest.class);
-    List<QueueUserACLInfo> userAcls = 
-      applicationsManager.getQueueUserAcls(request).getUserAclsInfoList();
-    return TypeConverter.fromYarnQueueUserAclsInfo(userAcls);
+    return TypeConverter.fromYarnQueueUserAclsInfo(super
+      .getQueueAclsInfo());
   }
 
-
   public QueueInfo[] getQueues() throws IOException, InterruptedException {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-      new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-
-    org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = 
-      applicationsManager.getQueueInfo(
-          getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
-    getChildQueues(rootQueue, queues, true);
-
-    return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
   }
 
-
   public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
-    List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-      new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-
-    org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = 
-      applicationsManager.getQueueInfo(
-          getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
-    getChildQueues(rootQueue, queues, false);
-
-    return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
   }
 
   public QueueInfo[] getChildQueues(String parent) throws IOException,
       InterruptedException {
-      List<org.apache.hadoop.yarn.api.records.QueueInfo> queues = 
-          new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
-        
-        org.apache.hadoop.yarn.api.records.QueueInfo parentQueue = 
-          applicationsManager.getQueueInfo(
-              getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
-        getChildQueues(parentQueue, queues, true);
-        
-        return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+    return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+      this.conf);
   }
 
   public String getStagingAreaDir() throws IOException, InterruptedException {
@@ -307,40 +175,6 @@ public class ResourceMgrDelegate {
     return 0;
   }
   
-  
-  public ApplicationId submitApplication(
-      ApplicationSubmissionContext appContext) 
-  throws IOException {
-    appContext.setApplicationId(applicationId);
-    SubmitApplicationRequest request = 
-        recordFactory.newRecordInstance(SubmitApplicationRequest.class);
-    request.setApplicationSubmissionContext(appContext);
-    applicationsManager.submitApplication(request);
-    LOG.info("Submitted application " + applicationId + " to ResourceManager" +
-    		" at " + rmAddress);
-    return applicationId;
-  }
-  
-  public void killApplication(ApplicationId applicationId) throws IOException {
-    KillApplicationRequest request = 
-        recordFactory.newRecordInstance(KillApplicationRequest.class);
-    request.setApplicationId(applicationId);
-    applicationsManager.forceKillApplication(request);
-    LOG.info("Killing application " + applicationId);
-  }
-
-
-  public ApplicationReport getApplicationReport(ApplicationId appId)
-      throws YarnRemoteException {
-    GetApplicationReportRequest request = recordFactory
-        .newRecordInstance(GetApplicationReportRequest.class);
-    request.setApplicationId(appId);
-    GetApplicationReportResponse response = applicationsManager
-        .getApplicationReport(request);
-    ApplicationReport applicationReport = response.getApplicationReport();
-    return applicationReport;
-  }
-
   public ApplicationId getApplicationId() {
     return applicationId;
   }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Tue Oct 16 00:02:55 2012
@@ -89,7 +89,7 @@ import org.apache.hadoop.yarn.util.Proto
 /**
  * This class enables the current JobClient (0.22 hadoop) to run on YARN.
  */
-@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
@@ -345,10 +345,14 @@ public class YARNRunner implements Clien
         createApplicationResource(defaultFileContext,
             jobConfPath, LocalResourceType.FILE));
     if (jobConf.get(MRJobConfig.JAR) != null) {
-      localResources.put(MRJobConfig.JOB_JAR,
-          createApplicationResource(defaultFileContext,
-              new Path(jobSubmitDir, MRJobConfig.JOB_JAR), 
-              LocalResourceType.ARCHIVE));
+      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+      LocalResource rc = createApplicationResource(defaultFileContext,
+          jobJarPath, 
+          LocalResourceType.PATTERN);
+      String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
+          JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+      rc.setPattern(pattern);
+      localResources.put(MRJobConfig.JOB_JAR, rc);
     } else {
       // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
       // mapreduce jar itself which is already on the classpath.
@@ -368,12 +372,9 @@ public class YARNRunner implements Clien
     }
 
     // Setup security tokens
-    ByteBuffer securityTokens = null;
-    if (UserGroupInformation.isSecurityEnabled()) {
-      DataOutputBuffer dob = new DataOutputBuffer();
-      ts.writeTokenStorageToStream(dob);
-      securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    }
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 
     // Setup the command to run the AM
     List<String> vargs = new ArrayList<String>(8);

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestJobConf.java Tue Oct 16 00:02:55 2012
@@ -21,6 +21,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 public class TestJobConf {
@@ -185,4 +186,19 @@ public class TestJobConf {
     
     
   }
+
+  /**
+   * Ensure that by default JobContext.MAX_TASK_FAILURES_PER_TRACKER is less
+   * JobContext.MAP_MAX_ATTEMPTS and JobContext.REDUCE_MAX_ATTEMPTS so that
+   * failed tasks will be retried on other nodes
+   */
+  @Test
+  public void testMaxTaskFailuresPerTracker() {
+    JobConf jobConf = new JobConf(true);
+    Assert.assertTrue("By default JobContext.MAX_TASK_FAILURES_PER_TRACKER was "
+      + "not less than JobContext.MAP_MAX_ATTEMPTS and REDUCE_MAX_ATTEMPTS"
+      ,jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxMapAttempts() &&
+      jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxReduceAttempts()
+      );
+  }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java Tue Oct 16 00:02:55 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.fs;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 import org.apache.hadoop.conf.Configured;
@@ -33,7 +34,6 @@ import org.apache.hadoop.mapred.*;
  * statistics data to be collected by subsequent reducers.
  * 
  */
-@SuppressWarnings("deprecation")
 public abstract class IOMapperBase<T> extends Configured
     implements Mapper<Text, LongWritable, Text, Text> {
   
@@ -41,6 +41,7 @@ public abstract class IOMapperBase<T> ex
   protected int bufferSize;
   protected FileSystem fs;
   protected String hostName;
+  protected Closeable stream;
 
   public IOMapperBase() { 
   }
@@ -79,6 +80,18 @@ public abstract class IOMapperBase<T> ex
                        long value) throws IOException;
 
   /**
+   * Create an input or output stream based on the specified file.
+   * Subclasses should override this method to provide an actual stream.
+   * 
+   * @param name file name
+   * @return the stream
+   * @throws IOException
+   */
+  public Closeable getIOStream(String name) throws IOException {
+    return null;
+  }
+
+  /**
    * Collect stat data to be combined by a subsequent reducer.
    * 
    * @param output
@@ -113,9 +126,15 @@ public abstract class IOMapperBase<T> ex
     long longValue = value.get();
     
     reporter.setStatus("starting " + name + " ::host = " + hostName);
-    
+
+    this.stream = getIOStream(name);
+    T statValue = null;
     long tStart = System.currentTimeMillis();
-    T statValue = doIO(reporter, name, longValue);
+    try {
+      statValue = doIO(reporter, name, longValue);
+    } finally {
+      if(stream != null) stream.close();
+    }
     long tEnd = System.currentTimeMillis();
     long execTime = tEnd - tStart;
     collectStats(output, name, execTime, statValue);