You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/04/26 03:19:04 UTC

svn commit: r1476011 - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-comm...

Author: szetszwo
Date: Fri Apr 26 01:19:00 2013
New Revision: 1476011

URL: http://svn.apache.org/r1476011
Log:
Merge r1471229 through r1476009 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapperReducerCleanup.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java
      - copied unchanged from r1476009, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1471229-1476009

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Fri Apr 26 01:19:00 2013
@@ -209,6 +209,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs
     anyways after YARN-561. (Xuan Gong via vinodkv)
 
+    MAPREDUCE-5069. add concrete common implementations of
+    CombineFileInputFormat (Sangjin Lee via bobby)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 
@@ -336,6 +339,23 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5146. application classloader may be used too early to load
     classes. (Sangjin Lee via tomwhite)
 
+    MAPREDUCE-4737. Ensure that mapreduce APIs are semantically consistent
+    with mapred API w.r.t Mapper.cleanup and Reducer.cleanup; in the sense that
+    cleanup is now called even if there is an error. The old mapred API
+    already ensures that Mapper.close and Reducer.close are invoked during
+    error handling. Note that it is an incompatible change, however end-users 
+    can override Mapper.run and Reducer.run to get the old (inconsistent) 
+    behaviour. (acmurthy)
+
+    MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient
+    synchronization on updates to task Counters. (Sandy Ryza via acmurthy)
+
+    MAPREDUCE-5181. RMCommunicator should not use AMToken from the env.
+    (Vinod Kumar Vavilapalli via sseth)
+
+    MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after
+    YARN-577. (Hitesh Shah via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1471229-1476009

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1471229-1476009

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Apr 26 01:19:00 2013
@@ -38,13 +38,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -280,24 +276,7 @@ public abstract class RMCommunicator ext
       throw new YarnException(e);
     }
 
-    if (UserGroupInformation.isSecurityEnabled()) {
-      String tokenURLEncodedStr = System.getenv().get(
-          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
-      try {
-        token.decodeFromUrlString(tokenURLEncodedStr);
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-
-      SecurityUtil.setTokenService(token, serviceAddr);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AppMasterToken is " + token);
-      }
-      currentUser.addToken(token);
-    }
-
+    // CurrentUser should already have AMToken loaded.
     return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
       @Override
       public AMRMProtocol run() {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Apr 26 01:19:00 2013
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -86,8 +90,6 @@ public class LocalJobRunner implements C
 
   private static final String jobDir =  "localRunner/";
 
-  private static final Counters EMPTY_COUNTERS = new Counters();
-
   public long getProtocolVersion(String protocol, long clientVersion) {
     return ClientProtocol.versionID;
   }
@@ -273,10 +275,10 @@ public class LocalJobRunner implements C
       this.partialMapProgress = new float[numMaps];
       this.mapCounters = new Counters[numMaps];
       for (int i = 0; i < numMaps; i++) {
-        this.mapCounters[i] = EMPTY_COUNTERS;
+        this.mapCounters[i] = new Counters();
       }
 
-      this.reduceCounters = EMPTY_COUNTERS;
+      this.reduceCounters = new Counters();
     }
 
     /**
@@ -497,6 +499,15 @@ public class LocalJobRunner implements C
     
     public synchronized boolean statusUpdate(TaskAttemptID taskId,
         TaskStatus taskStatus) throws IOException, InterruptedException {
+      // Serialize as we would if distributed in order to make deep copy
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      taskStatus.write(dos);
+      dos.close();
+      taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap());
+      taskStatus.readFields(new DataInputStream(
+          new ByteArrayInputStream(baos.toByteArray())));
+      
       LOG.info(taskStatus.getStateString());
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
@@ -525,10 +536,10 @@ public class LocalJobRunner implements C
     public synchronized Counters getCurrentCounters() {
       if (null == mapCounters) {
         // Counters not yet initialized for job.
-        return EMPTY_COUNTERS;
+        return new Counters();
       }
 
-      Counters current = EMPTY_COUNTERS;
+      Counters current = new Counters();
       for (Counters c : mapCounters) {
         current = Counters.sum(current, c);
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Fri Apr 26 01:19:00 2013
@@ -434,10 +434,15 @@ public class MapTask extends Task {
       }
       statusUpdate(umbilical);
       collector.flush();
-    } finally {
-      //close
-      in.close();                               // close input
+      
+      in.close();
+      in = null;
+      
       collector.close();
+      collector = null;
+    } finally {
+      closeQuietly(in);
+      closeQuietly(collector);
     }
   }
 
@@ -753,13 +758,20 @@ public class MapTask extends Task {
           new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
               mapContext);
 
-    input.initialize(split, mapperContext);
-    mapper.run(mapperContext);
-    mapPhase.complete();
-    setPhase(TaskStatus.Phase.SORT);
-    statusUpdate(umbilical);
-    input.close();
-    output.close(mapperContext);
+    try {
+      input.initialize(split, mapperContext);
+      mapper.run(mapperContext);
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
+      input.close();
+      input = null;
+      output.close(mapperContext);
+      output = null;
+    } finally {
+      closeQuietly(input);
+      closeQuietly(output, mapperContext);
+    }
   }
 
   class DirectMapOutputCollector<K, V>
@@ -1949,4 +1961,55 @@ public class MapTask extends Task {
     }
   }
 
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+  void closeQuietly(RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (IOException ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(MapOutputCollector<OUTKEY, OUTVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+  
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> c) {
+    if (c != null) {
+      try {
+        c.close();
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
+
+  private <INKEY, INVALUE, OUTKEY, OUTVALUE>
+  void closeQuietly(
+      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> c,
+      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+          mapperContext) {
+    if (c != null) {
+      try {
+        c.close(mapperContext);
+      } catch (Exception ie) {
+        // Ignore
+        LOG.info("Ignoring exception during close for " + c, ie);
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Fri Apr 26 01:19:00 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -428,14 +429,15 @@ public class ReduceTask extends Task {
     // make output collector
     String finalName = getOutputName(getPartition());
 
-    final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
+    RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
         this, job, reporter, finalName);
-
+    final RecordWriter<OUTKEY, OUTVALUE> finalOut = out;
+    
     OutputCollector<OUTKEY,OUTVALUE> collector = 
       new OutputCollector<OUTKEY,OUTVALUE>() {
         public void collect(OUTKEY key, OUTVALUE value)
           throws IOException {
-          out.write(key, value);
+          finalOut.write(key, value);
           // indicate that progress update needs to be sent
           reporter.progress();
         }
@@ -466,20 +468,14 @@ public class ReduceTask extends Task {
         values.informReduceProgress();
       }
 
-      //Clean up: repeated in catch block below
       reducer.close();
-      out.close(reporter);
-      //End of clean up.
-    } catch (IOException ioe) {
-      try {
-        reducer.close();
-      } catch (IOException ignored) {}
-        
-      try {
-        out.close(reporter);
-      } catch (IOException ignored) {}
+      reducer = null;
       
-      throw ioe;
+      out.close(reporter);
+      out = null;
+    } finally {
+      IOUtils.cleanup(LOG, reducer);
+      closeQuietly(out, reporter);
     }
   }
 
@@ -645,7 +641,21 @@ public class ReduceTask extends Task {
                                                committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
-    reducer.run(reducerContext);
-    trackedRW.close(reducerContext);
+    try {
+      reducer.run(reducerContext);
+    } finally {
+      trackedRW.close(reducerContext);
+    }
+  }
+  
+  private <OUTKEY, OUTVALUE>
+  void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
+    if (c != null) {
+      try {
+        c.close(r);
+      } catch (Exception e) {
+        LOG.info("Exception in closing " + c, e);
+      }
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Mapper.java Fri Apr 26 01:19:00 2013
@@ -140,9 +140,12 @@ public class Mapper<KEYIN, VALUEIN, KEYO
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKeyValue()) {
-      map(context.getCurrentKey(), context.getCurrentValue(), context);
+    try {
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(), context.getCurrentValue(), context);
+      }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java Fri Apr 26 01:19:00 2013
@@ -166,14 +166,17 @@ public class Reducer<KEYIN,VALUEIN,KEYOU
    */
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    while (context.nextKey()) {
-      reduce(context.getCurrentKey(), context.getValues(), context);
-      // If a back up store is used, reset it
-      Iterator<VALUEIN> iter = context.getValues().iterator();
-      if(iter instanceof ReduceContext.ValueIterator) {
-        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
+    try {
+      while (context.nextKey()) {
+        reduce(context.getCurrentKey(), context.getValues(), context);
+        // If a back up store is used, reset it
+        Iterator<VALUEIN> iter = context.getValues().iterator();
+        if(iter instanceof ReduceContext.ValueIterator) {
+          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
+        }
       }
+    } finally {
+      cleanup(context);
     }
-    cleanup(context);
   }
 }

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1471229-1476009

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Fri Apr 26 01:19:00 2013
@@ -89,7 +89,7 @@ public class NotRunningJob implements MR
     // used for a non running job
     return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
         "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
-        "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
+        "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f);
   }
 
   NotRunningJob(ApplicationReport applicationReport, JobState jobState) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1476011&r1=1476010&r2=1476011&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Fri Apr 26 01:19:00 2013
@@ -413,7 +413,7 @@ public class TestClientServiceDelegate {
     return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
         "appname", "host", 124, null, YarnApplicationState.FINISHED,
         "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
-        "N/A");
+        "N/A", 0.0f);
   }
 
   private ApplicationReport getRunningApplicationReport(String host, int port) {
@@ -423,7 +423,7 @@ public class TestClientServiceDelegate {
     return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
         "appname", host, port, null, YarnApplicationState.RUNNING,
         "diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null,
-        "N/A");
+        "N/A", 0.0f);
   }
 
   private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {