You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/02/10 02:49:30 UTC

svn commit: r1242635 [6/10] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java Fri Feb 10 01:49:08 2012
@@ -22,10 +22,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
@@ -33,6 +31,8 @@ import org.apache.hadoop.io.WritableUtil
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.util.ResourceBundles;
 
+import com.google.common.collect.Iterators;
+
 /**
  * An abstract class to provide common implementation of the
  * generic counter group in both mapred and mapreduce package.
@@ -45,7 +45,8 @@ public abstract class AbstractCounterGro
 
   private final String name;
   private String displayName;
-  private final Map<String, T> counters = Maps.newTreeMap();
+  private final ConcurrentMap<String, T> counters =
+      new ConcurrentSkipListMap<String, T>();
   private final Limits limits;
 
   public AbstractCounterGroup(String name, String displayName,
@@ -56,7 +57,7 @@ public abstract class AbstractCounterGro
   }
 
   @Override
-  public synchronized String getName() {
+  public String getName() {
     return name;
   }
 
@@ -79,7 +80,7 @@ public abstract class AbstractCounterGro
   @Override
   public synchronized T addCounter(String counterName, String displayName,
                                    long value) {
-    String saveName = limits.filterCounterName(counterName);
+    String saveName = Limits.filterCounterName(counterName);
     T counter = findCounterImpl(saveName, false);
     if (counter == null) {
       return addCounterImpl(saveName, displayName, value);
@@ -95,8 +96,10 @@ public abstract class AbstractCounterGro
   }
 
   @Override
-  public T findCounter(String counterName, String displayName) {
-    String saveName = limits.filterCounterName(counterName);
+  public synchronized T findCounter(String counterName, String displayName) {
+    // Take lock to avoid two threads not finding a counter and trying to add
+    // the same counter.
+    String saveName = Limits.filterCounterName(counterName);
     T counter = findCounterImpl(saveName, false);
     if (counter == null) {
       return addCounterImpl(saveName, displayName, 0);
@@ -105,11 +108,13 @@ public abstract class AbstractCounterGro
   }
 
   @Override
-  public synchronized T findCounter(String counterName, boolean create) {
-    return findCounterImpl(limits.filterCounterName(counterName), create);
+  public T findCounter(String counterName, boolean create) {
+    return findCounterImpl(Limits.filterCounterName(counterName), create);
   }
 
-  private T findCounterImpl(String counterName, boolean create) {
+  // Lock the object. Cannot simply use concurrent constructs on the counters
+  // data-structure (like putIfAbsent) because of localization, limits etc.
+  private synchronized T findCounterImpl(String counterName, boolean create) {
     T counter = counters.get(counterName);
     if (counter == null && create) {
       String localized =
@@ -141,7 +146,7 @@ public abstract class AbstractCounterGro
   protected abstract T newCounter();
 
   @Override
-  public synchronized Iterator<T> iterator() {
+  public Iterator<T> iterator() {
     return counters.values().iterator();
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java Fri Feb 10 01:49:08 2012
@@ -18,18 +18,18 @@
 
 package org.apache.hadoop.mapreduce.counters;
 
+import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId;
+import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Text;
@@ -39,7 +39,10 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskCounter;
-import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
 
 /**
  * An abstract class to provide common implementation for the Counters
@@ -60,8 +63,10 @@ public abstract class AbstractCounters<C
    * A cache from enum values to the associated counter.
    */
   private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
-  private Map<String, G> fgroups = Maps.newTreeMap(); // framework & fs groups
-  private Map<String, G> groups = Maps.newTreeMap();  // other groups
+  //framework & fs groups
+  private Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
+  // other groups
+  private Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
   private final CounterGroupFactory<C, G> groupFactory;
 
   // For framework counter serialization without strings
@@ -171,7 +176,8 @@ public abstract class AbstractCounters<C
   @InterfaceAudience.Private
   public synchronized C findCounter(String scheme, FileSystemCounter key) {
     return ((FileSystemCounterGroup<C>) getGroup(
-        FileSystemCounter.class.getName())).findCounter(scheme, key);
+        FileSystemCounter.class.getName()).getUnderlyingGroup()).
+        findCounter(scheme, key);
   }
 
   /**
@@ -213,7 +219,7 @@ public abstract class AbstractCounters<C
   private String filterGroupName(String oldName) {
     String newName = legacyMap.get(oldName);
     if (newName == null) {
-      return limits.filterGroupName(oldName);
+      return Limits.filterGroupName(oldName);
     }
     LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead");
     return newName;
@@ -241,11 +247,11 @@ public abstract class AbstractCounters<C
     WritableUtils.writeVInt(out, groupFactory.version());
     WritableUtils.writeVInt(out, fgroups.size());  // framework groups first
     for (G group : fgroups.values()) {
-      if (group instanceof FrameworkCounterGroup<?, ?>) {
+      if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
         WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
         WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
         group.write(out);
-      } else if (group instanceof FileSystemCounterGroup<?>) {
+      } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
         WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
         group.write(out);
       }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java Fri Feb 10 01:49:08 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.counters;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Counter;
@@ -98,4 +99,11 @@ public interface CounterGroupBase<T exte
    * @param rightGroup  the group to be added to this group
    */
   void incrAllCounters(CounterGroupBase<T> rightGroup);
+  
+  @Private
+  /**
+   * Exposes the underlying group type if a facade.
+   * @return the underlying object that this object is wrapping up.
+   */
+  CounterGroupBase<T> getUnderlyingGroup();
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java Fri Feb 10 01:49:08 2012
@@ -110,6 +110,11 @@ public abstract class FileSystemCounterG
     public void readFields(DataInput in) throws IOException {
       assert false : "shouldn't be called";
     }
+
+    @Override
+    public Counter getUnderlyingCounter() {
+      return this;
+    }
   }
 
   @Override
@@ -231,10 +236,10 @@ public abstract class FileSystemCounterG
   @Override
   @SuppressWarnings("unchecked")
   public void incrAllCounters(CounterGroupBase<C> other) {
-    if (checkNotNull(other, "other group")
+    if (checkNotNull(other.getUnderlyingGroup(), "other group")
         instanceof FileSystemCounterGroup<?>) {
       for (Counter counter : other) {
-        FSCounter c = (FSCounter) counter;
+        FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
         findCounter(c.scheme, c.key) .increment(counter.getValue());
       }
     }
@@ -253,7 +258,7 @@ public abstract class FileSystemCounterG
       for (Object counter : entry.getValue()) {
         if (counter == null) continue;
         @SuppressWarnings("unchecked")
-        FSCounter c = (FSCounter) counter;
+        FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
         WritableUtils.writeVInt(out, c.key.ordinal());  // key
         WritableUtils.writeVLong(out, c.getValue());    // value
       }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java Fri Feb 10 01:49:08 2012
@@ -18,21 +18,24 @@
 
 package org.apache.hadoop.mapreduce.counters;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
-import static com.google.common.base.Preconditions.*;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.util.ResourceBundles;
 
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
 /**
  * An abstract class to provide common implementation for the framework
  * counter group in both mapred and mapreduce packages.
@@ -43,7 +46,8 @@ import org.apache.hadoop.mapreduce.util.
 @InterfaceAudience.Private
 public abstract class FrameworkCounterGroup<T extends Enum<T>,
     C extends Counter> implements CounterGroupBase<C> {
-
+  private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
+  
   private final Class<T> enumClass; // for Enum.valueOf
   private final Object[] counters;  // local casts are OK and save a class ref
   private String displayName = null;
@@ -95,6 +99,11 @@ public abstract class FrameworkCounterGr
     public void readFields(DataInput in) throws IOException {
       assert false : "shouldn't be called";
     }
+
+    @Override
+    public Counter getUnderlyingCounter() {
+      return this;
+    }
   }
 
   @SuppressWarnings("unchecked")

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java Fri Feb 10 01:49:08 2012
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
 
 /**
  * A generic counter implementation
@@ -101,4 +102,9 @@ public class GenericCounter extends Abst
   public synchronized void increment(long incr) {
     value += incr;
   }
+
+  @Override
+  public Counter getUnderlyingCounter() {
+    return this;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java Fri Feb 10 01:49:08 2012
@@ -42,11 +42,11 @@ public class Limits {
     return name.length() > maxLen ? name.substring(0, maxLen - 1) : name;
   }
 
-  public String filterCounterName(String name) {
+  public static String filterCounterName(String name) {
     return filterName(name, COUNTER_NAME_MAX);
   }
 
-  public String filterGroupName(String name) {
+  public static String filterGroupName(String name) {
     return filterName(name, GROUP_NAME_MAX);
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Feb 10 01:49:08 2012
@@ -24,8 +24,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,9 +55,13 @@ import org.apache.hadoop.yarn.api.record
 @InterfaceStability.Unstable
 public class JobHistoryParser {
 
+  private static final Log LOG = LogFactory.getLog(JobHistoryParser.class);
+  
   private final FSDataInputStream in;
-  JobInfo info = null;
+  private JobInfo info = null;
 
+  private IOException parseException = null;
+  
   /**
    * Create a job history parser for the given history file using the 
    * given file system
@@ -91,30 +98,58 @@ public class JobHistoryParser {
    * The first invocation will populate the object, subsequent calls
    * will return the already parsed object. 
    * The input stream is closed on return 
+   * 
+   * This api ignores partial records and stops parsing on encountering one.
+   * {@link #getParseException()} can be used to fetch the exception, if any.
+   * 
    * @return The populated jobInfo object
    * @throws IOException
+   * @see #getParseException()
    */
   public synchronized JobInfo parse() throws IOException {
+    return parse(new EventReader(in)); 
+  }
+
+  /**
+   * Only used for unit tests.
+   */
+  @Private
+  public synchronized JobInfo parse(EventReader reader) throws IOException {
 
     if (info != null) {
       return info;
     }
 
-    EventReader reader = new EventReader(in);
+    info = new JobInfo();
 
+    int eventCtr = 0;
     HistoryEvent event;
-    info = new JobInfo();
     try {
       while ((event = reader.getNextEvent()) != null) {
         handleEvent(event);
-      }
+        ++eventCtr;
+      } 
+    } catch (IOException ioe) {
+      LOG.info("Caught exception parsing history file after " + eventCtr + 
+          " events", ioe);
+      parseException = ioe;
     } finally {
       in.close();
     }
     return info;
   }
   
-  private void handleEvent(HistoryEvent event) throws IOException { 
+  /**
+   * Get the parse exception, if any.
+   * 
+   * @return the parse exception, if any
+   * @see #parse()
+   */
+  public synchronized IOException getParseException() {
+    return parseException;
+  }
+  
+  private void handleEvent(HistoryEvent event)  { 
     EventType type = event.getEventType();
 
     switch (type) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Fri Feb 10 01:49:08 2012
@@ -19,16 +19,16 @@
 package org.apache.hadoop.mapreduce.lib.output;
 
 import java.io.IOException;
-import java.net.URI;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -37,41 +37,239 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 /** An {@link OutputCommitter} that commits files specified 
- * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 
+ * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
  **/
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FileOutputCommitter extends OutputCommitter {
-
   private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
 
-  /**
-   * Temporary directory name 
+  /** 
+   * Name of directory where pending data is placed.  Data that has not been
+   * committed yet.
    */
-  protected static final String TEMP_DIR_NAME = "_temporary";
+  public static final String PENDING_DIR_NAME = "_temporary";
   public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
-  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
+  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
     "mapreduce.fileoutputcommitter.marksuccessfuljobs";
-  private FileSystem outputFileSystem = null;
   private Path outputPath = null;
   private Path workPath = null;
 
   /**
    * Create a file output committer
-   * @param outputPath the job's output path
+   * @param outputPath the job's output path, or null if you want the output
+   * committer to act as a noop.
    * @param context the task's context
    * @throws IOException
    */
   public FileOutputCommitter(Path outputPath, 
                              TaskAttemptContext context) throws IOException {
+    this(outputPath, (JobContext)context);
+    if (outputPath != null) {
+      workPath = getTaskAttemptPath(context, outputPath);
+    }
+  }
+  
+  /**
+   * Create a file output committer
+   * @param outputPath the job's output path, or null if you want the output
+   * committer to act as a noop.
+   * @param context the task's context
+   * @throws IOException
+   */
+  @Private
+  public FileOutputCommitter(Path outputPath, 
+                             JobContext context) throws IOException {
     if (outputPath != null) {
-      this.outputPath = outputPath;
-      outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
-      workPath = new Path(outputPath,
-                          getTaskAttemptBaseDirName(context))
-                          .makeQualified(outputFileSystem);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      this.outputPath = fs.makeQualified(outputPath);
+    }
+  }
+  
+  /**
+   * @return the path where final output of the job should be placed.  This
+   * could also be considered the committed application attempt path.
+   */
+  private Path getOutputPath() {
+    return this.outputPath;
+  }
+  
+  /**
+   * @return true if we have an output path set, else false.
+   */
+  private boolean hasOutputPath() {
+    return this.outputPath != null;
+  }
+  
+  /**
+   * @return the path where the output of pending job attempts are
+   * stored.
+   */
+  private Path getPendingJobAttemptsPath() {
+    return getPendingJobAttemptsPath(getOutputPath());
+  }
+  
+  /**
+   * Get the location of pending job attempts.
+   * @param out the base output directory.
+   * @return the location of pending job attempts.
+   */
+  private static Path getPendingJobAttemptsPath(Path out) {
+    return new Path(out, PENDING_DIR_NAME);
+  }
+  
+  /**
+   * Get the Application Attempt Id for this job
+   * @param context the context to look in
+   * @return the Application Attempt Id for a given job.
+   */
+  private static int getAppAttemptId(JobContext context) {
+    return context.getConfiguration().getInt(
+        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param context the context of the job.  This is used to get the
+   * application attempt id.
+   * @return the path to store job attempt data.
+   */
+  public Path getJobAttemptPath(JobContext context) {
+    return getJobAttemptPath(context, getOutputPath());
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param context the context of the job.  This is used to get the
+   * application attempt id.
+   * @param out the output path to place these in.
+   * @return the path to store job attempt data.
+   */
+  public static Path getJobAttemptPath(JobContext context, Path out) {
+    return getJobAttemptPath(getAppAttemptId(context), out);
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path to store job attempt data.
+   */
+  private Path getJobAttemptPath(int appAttemptId) {
+    return getJobAttemptPath(appAttemptId, getOutputPath());
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path to store job attempt data.
+   */
+  private static Path getJobAttemptPath(int appAttemptId, Path out) {
+    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
+  }
+  
+  /**
+   * Compute the path where the output of pending task attempts are stored.
+   * @param context the context of the job with pending tasks. 
+   * @return the path where the output of pending task attempts are stored.
+   */
+  private Path getPendingTaskAttemptsPath(JobContext context) {
+    return getPendingTaskAttemptsPath(context, getOutputPath());
+  }
+  
+  /**
+   * Compute the path where the output of pending task attempts are stored.
+   * @param context the context of the job with pending tasks. 
+   * @return the path where the output of pending task attempts are stored.
+   */
+  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
+    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
+  }
+  
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   * 
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  public Path getTaskAttemptPath(TaskAttemptContext context) {
+    return new Path(getPendingTaskAttemptsPath(context), 
+        String.valueOf(context.getTaskAttemptID()));
+  }
+  
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   * 
+   * @param context the context of the task attempt.
+   * @param out The output path to put things in.
+   * @return the path where a task attempt should be stored.
+   */
+  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
+    return new Path(getPendingTaskAttemptsPath(context, out), 
+        String.valueOf(context.getTaskAttemptID()));
+  }
+  
+  /**
+   * Compute the path where the output of a committed task is stored until
+   * the entire job is committed.
+   * @param context the context of the task attempt
+   * @return the path where the output of a committed task is stored until
+   * the entire job is committed.
+   */
+  public Path getCommittedTaskPath(TaskAttemptContext context) {
+    return getCommittedTaskPath(getAppAttemptId(context), context);
+  }
+  
+  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
+    return getCommittedTaskPath(getAppAttemptId(context), context, out);
+  }
+  
+  /**
+   * Compute the path where the output of a committed task is stored until the
+   * entire job is committed for a specific application attempt.
+   * @param appAttemptId the id of the application attempt to use
+   * @param context the context of any task.
+   * @return the path where the output of a committed task is stored.
+   */
+  private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
+    return new Path(getJobAttemptPath(appAttemptId),
+        String.valueOf(context.getTaskAttemptID().getTaskID()));
+  }
+  
+  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
+    return new Path(getJobAttemptPath(appAttemptId, out),
+        String.valueOf(context.getTaskAttemptID().getTaskID()));
+  }
+  
+  private static class CommittedTaskFilter implements PathFilter {
+    @Override
+    public boolean accept(Path path) {
+      return !PENDING_DIR_NAME.equals(path.getName());
     }
   }
+  
+  /**
+   * Get a list of all paths where output from committed tasks are stored.
+   * @param context the context of the current job
+   * @return the list of these Paths/FileStatuses. 
+   * @throws IOException
+   */
+  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
+    throws IOException {
+    Path jobAttemptPath = getJobAttemptPath(context);
+    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
+    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
+  }
+  
+  /**
+   * Get the directory that the task should write results into.
+   * @return the work directory
+   * @throws IOException
+   */
+  public Path getWorkPath() throws IOException {
+    return workPath;
+  }
 
   /**
    * Create the temporary directory that is the root of all of the task 
@@ -79,116 +277,103 @@ public class FileOutputCommitter extends
    * @param context the job's context
    */
   public void setupJob(JobContext context) throws IOException {
-    if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + 
-    		  Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-      if (!fileSys.mkdirs(tmpDir)) {
-        LOG.error("Mkdirs failed to create " + tmpDir.toString());
+    if (hasOutputPath()) {
+      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+      FileSystem fs = pendingJobAttemptsPath.getFileSystem(
+          context.getConfiguration());
+      if (!fs.mkdirs(pendingJobAttemptsPath)) {
+        LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath);
       }
-    }
-  }
-
-  // True if the job requires output.dir marked on successful job.
-  // Note that by default it is set to true.
-  private boolean shouldMarkOutputDir(Configuration conf) {
-    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
-  }
-  
-  // Create a _success file in the job's output dir
-  private void markOutputDirSuccessful(MRJobConfig context) throws IOException {
-    if (outputPath != null) {
-      // create a file in the output folder to mark the job completion
-      Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-      outputFileSystem.create(filePath).close();
+    } else {
+      LOG.warn("Output Path is null in setupJob()");
     }
   }
   
   /**
-   * Move all job output to the final place.
+   * The job has completed so move all committed tasks to the final output dir.
    * Delete the temporary directory, including all of the work directories.
    * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
   public void commitJob(JobContext context) throws IOException {
-    if (outputPath != null) {
-      //delete the task temp directory from the current jobtempdir
-      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
-          Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-      if (fileSys.exists(tmpDir)) {
-        fileSys.delete(tmpDir, true);
-      } else {
-        LOG.warn("Task temp dir could not be deleted " + tmpDir);
+    if (hasOutputPath()) {
+      Path finalOutput = getOutputPath();
+      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
+      for(FileStatus stat: getAllCommittedTaskPaths(context)) {
+        mergePaths(fs, stat, finalOutput);
       }
 
-      //move the job output to final place
-      Path jobOutputPath = 
-          new Path(outputPath, getJobAttemptBaseDirName(context));
-      moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
-
       // delete the _temporary folder and create a _done file in the o/p folder
       cleanupJob(context);
-      if (shouldMarkOutputDir(context.getConfiguration())) {
-        markOutputDirSuccessful(context);
+      // True if the job requires output.dir marked on successful job.
+      // Note that by default it is set to true.
+      if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
+        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+        fs.create(markerPath).close();
       }
+    } else {
+      LOG.warn("Output Path is null in commitJob()");
     }
   }
 
   /**
-   * Move job output to final location 
-   * @param fs Filesystem handle
-   * @param origJobOutputPath The original location of the job output
-   * Required to generate the relative path for correct moving of data. 
-   * @param finalOutputDir The final output directory to which the job output 
-   *                       needs to be moved
-   * @param jobOutput The current job output directory being moved 
-   * @throws IOException
-   */
-  private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath, 
-      Path finalOutputDir, Path jobOutput) throws IOException {
-    LOG.debug("Told to move job output from " + jobOutput
-        + " to " + finalOutputDir + 
-        " and orig job output path is " + origJobOutputPath);    
-    if (fs.isFile(jobOutput)) {
-      Path finalOutputPath = 
-          getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
-      if (!fs.rename(jobOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of job");
-        }
-        if (!fs.rename(jobOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of job");
-        }
-      }
-      LOG.debug("Moved job output file from " + jobOutput + " to " + 
-          finalOutputPath);
-    } else if (fs.getFileStatus(jobOutput).isDirectory()) {
-      LOG.debug("Job output file " + jobOutput + " is a dir");
-      FileStatus[] paths = fs.listStatus(jobOutput);
-      Path finalOutputPath = 
-          getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
-      fs.mkdirs(finalOutputPath);
-      LOG.debug("Creating dirs along job output path " + finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
-        }
-      }
-    }
+   * Merge two paths together.  Anything in from will be moved into to, if there
+   * are any name conflicts while merging the files or directories in from win.
+   * @param fs the File System to use
+   * @param from the path data is coming from.
+   * @param to the path data is going to.
+   * @throws IOException on any error
+   */
+  private static void mergePaths(FileSystem fs, final FileStatus from,
+      final Path to)
+    throws IOException {
+     LOG.debug("Merging data from "+from+" to "+to);
+     if(from.isFile()) {
+       if(fs.exists(to)) {
+         if(!fs.delete(to, true)) {
+           throw new IOException("Failed to delete "+to);
+         }
+       }
+
+       if(!fs.rename(from.getPath(), to)) {
+         throw new IOException("Failed to rename "+from+" to "+to);
+       }
+     } else if(from.isDirectory()) {
+       if(fs.exists(to)) {
+         FileStatus toStat = fs.getFileStatus(to);
+         if(!toStat.isDirectory()) {
+           if(!fs.delete(to, true)) {
+             throw new IOException("Failed to delete "+to);
+           }
+           if(!fs.rename(from.getPath(), to)) {
+             throw new IOException("Failed to rename "+from+" to "+to);
+           }
+         } else {
+           //It is a directory so merge everything in the directories
+           for(FileStatus subFrom: fs.listStatus(from.getPath())) {
+             Path subTo = new Path(to, subFrom.getPath().getName());
+             mergePaths(fs, subFrom, subTo);
+           }
+         }
+       } else {
+         //it does not exist just rename
+         if(!fs.rename(from.getPath(), to)) {
+           throw new IOException("Failed to rename "+from+" to "+to);
+         }
+       }
+     }
   }
 
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
-    if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-      if (fileSys.exists(tmpDir)) {
-        fileSys.delete(tmpDir, true);
-      }
+    if (hasOutputPath()) {
+      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+      FileSystem fs = pendingJobAttemptsPath
+          .getFileSystem(context.getConfiguration());
+      fs.delete(pendingJobAttemptsPath, true);
     } else {
-      LOG.warn("Output Path is null in cleanup");
+      LOG.warn("Output Path is null in cleanupJob()");
     }
   }
 
@@ -217,69 +402,40 @@ public class FileOutputCommitter extends
    * Move the files from the work directory to the job output directory
    * @param context the task context
    */
+  @Override
   public void commitTask(TaskAttemptContext context) 
   throws IOException {
-    TaskAttemptID attemptId = context.getTaskAttemptID();
-    if (workPath != null) {
-      context.progress();
-      if (outputFileSystem.exists(workPath)) {
-        // Move the task outputs to the current job attempt output dir
-    	  Path jobOutputPath = 
-    	      new Path(outputPath, getJobAttemptBaseDirName(context));
-        moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
-        // Delete the temporary task-specific output directory
-        if (!outputFileSystem.delete(workPath, true)) {
-          LOG.warn("Failed to delete the temporary output" + 
-          " directory of task: " + attemptId + " - " + workPath);
-        }
-        LOG.info("Saved output of task '" + attemptId + "' to " + 
-            jobOutputPath);
-      }
-    }
+    commitTask(context, null);
   }
 
-  /**
-   * Move all of the files from the work directory to the final output
-   * @param context the task context
-   * @param fs the output file system
-   * @param jobOutputDir the final output direcotry
-   * @param taskOutput the work path
-   * @throws IOException
-   */
-  private void moveTaskOutputs(TaskAttemptContext context,
-                               FileSystem fs,
-                               Path jobOutputDir,
-                               Path taskOutput) 
+  @Private
+  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
   throws IOException {
     TaskAttemptID attemptId = context.getTaskAttemptID();
-    context.progress();
-    LOG.debug("Told to move taskoutput from " + taskOutput
-        + " to " + jobOutputDir);    
-    if (fs.isFile(taskOutput)) {
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
-                                          workPath);
-      if (!fs.rename(taskOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of task: " + 
-                                 attemptId);
-        }
-        if (!fs.rename(taskOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of task: " + 
-        		  attemptId);
-        }
+    if (hasOutputPath()) {
+      context.progress();
+      if(taskAttemptPath == null) {
+        taskAttemptPath = getTaskAttemptPath(context);
       }
-      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
-    } else if(fs.getFileStatus(taskOutput).isDirectory()) {
-      LOG.debug("Taskoutput " + taskOutput + " is a dir");
-      FileStatus[] paths = fs.listStatus(taskOutput);
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
-      fs.mkdirs(finalOutputPath);
-      LOG.debug("Creating dirs along path " + finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
+      Path committedTaskPath = getCommittedTaskPath(context);
+      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+      if (fs.exists(taskAttemptPath)) {
+        if(fs.exists(committedTaskPath)) {
+          if(!fs.delete(committedTaskPath, true)) {
+            throw new IOException("Could not delete " + committedTaskPath);
+          }
+        }
+        if(!fs.rename(taskAttemptPath, committedTaskPath)) {
+          throw new IOException("Could not rename " + taskAttemptPath + " to "
+              + committedTaskPath);
         }
+        LOG.info("Saved output of task '" + attemptId + "' to " + 
+            committedTaskPath);
+      } else {
+        LOG.warn("No Output found for " + attemptId);
       }
+    } else {
+      LOG.warn("Output Path is null in commitTask()");
     }
   }
 
@@ -289,38 +445,22 @@ public class FileOutputCommitter extends
    */
   @Override
   public void abortTask(TaskAttemptContext context) throws IOException {
-    if (workPath != null) { 
-      context.progress();
-      outputFileSystem.delete(workPath, true);
-    }
+    abortTask(context, null);
   }
 
-  /**
-   * Find the final name of a given output file, given the job output directory
-   * and the work directory.
-   * @param jobOutputDir the job's output directory
-   * @param taskOutput the specific task output file
-   * @param taskOutputPath the job's work directory
-   * @return the final path for the specific output file
-   * @throws IOException
-   */
-  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
-                            Path taskOutputPath) throws IOException {    
-    URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(), 
-        outputFileSystem.getWorkingDirectory()).toUri();
-    URI taskOutputPathUri = 
-        taskOutputPath.makeQualified(
-            outputFileSystem.getUri(),
-            outputFileSystem.getWorkingDirectory()).toUri();
-    URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
-    if (taskOutputUri == relativePath) {
-      throw new IOException("Can not get the relative path: base = " + 
-          taskOutputPathUri + " child = " + taskOutputUri);
-    }
-    if (relativePath.getPath().length() > 0) {
-      return new Path(jobOutputDir, relativePath.getPath());
+  @Private
+  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
+    if (hasOutputPath()) { 
+      context.progress();
+      if(taskAttemptPath == null) {
+        taskAttemptPath = getTaskAttemptPath(context);
+      }
+      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+      if(!fs.delete(taskAttemptPath, true)) {
+        LOG.warn("Could not delete "+taskAttemptPath);
+      }
     } else {
-      return jobOutputDir;
+      LOG.warn("Output Path is null in abortTask()");
     }
   }
 
@@ -331,16 +471,20 @@ public class FileOutputCommitter extends
   @Override
   public boolean needsTaskCommit(TaskAttemptContext context
                                  ) throws IOException {
-    return workPath != null && outputFileSystem.exists(workPath);
+    return needsTaskCommit(context, null);
   }
 
-  /**
-   * Get the directory that the task should write results into
-   * @return the work directory
-   * @throws IOException
-   */
-  public Path getWorkPath() throws IOException {
-    return workPath;
+  @Private
+  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
+    ) throws IOException {
+    if(hasOutputPath()) {
+      if(taskAttemptPath == null) {
+        taskAttemptPath = getTaskAttemptPath(context);
+      }
+      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+      return fs.exists(taskAttemptPath);
+    }
+    return false;
   }
 
   @Override
@@ -351,44 +495,40 @@ public class FileOutputCommitter extends
   @Override
   public void recoverTask(TaskAttemptContext context)
       throws IOException {
-    context.progress();
-    Path jobOutputPath = 
-        new Path(outputPath, getJobAttemptBaseDirName(context));
-    int previousAttempt =         
-        context.getConfiguration().getInt(
-            MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
-    if (previousAttempt < 0) {
-      throw new IOException ("Cannot recover task output for first attempt...");
-    }
+    if(hasOutputPath()) {
+      context.progress();
+      TaskAttemptID attemptId = context.getTaskAttemptID();
+      int previousAttempt = getAppAttemptId(context) - 1;
+      if (previousAttempt < 0) {
+        throw new IOException ("Cannot recover task output for first attempt...");
+      }
 
-    Path pathToRecover = 
-        new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
-    LOG.debug("Trying to recover task from " + pathToRecover
-        + " into " + jobOutputPath);
-    if (outputFileSystem.exists(pathToRecover)) {
-      // Move the task outputs to their final place
-      moveJobOutputs(outputFileSystem, 
-          pathToRecover, jobOutputPath, pathToRecover);
-      LOG.info("Saved output of job to " + jobOutputPath);
+      Path committedTaskPath = getCommittedTaskPath(context);
+      Path previousCommittedTaskPath = getCommittedTaskPath(
+          previousAttempt, context);
+      FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+
+      LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
+          + " into " + committedTaskPath);
+      if (fs.exists(previousCommittedTaskPath)) {
+        if(fs.exists(committedTaskPath)) {
+          if(!fs.delete(committedTaskPath, true)) {
+            throw new IOException("Could not delete "+committedTaskPath);
+          }
+        }
+        //Rename can fail if the parent directory does not yet exist.
+        Path committedParent = committedTaskPath.getParent();
+        fs.mkdirs(committedParent);
+        if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+          throw new IOException("Could not rename " + previousCommittedTaskPath +
+              " to " + committedTaskPath);
+        }
+        LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
+      } else {
+        LOG.warn(attemptId+" had no output to recover.");
+      }
+    } else {
+      LOG.warn("Output Path is null in recoverTask()");
     }
   }
-
-  protected static String getJobAttemptBaseDirName(JobContext context) {
-    int appAttemptId = 
-        context.getConfiguration().getInt(
-            MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
-    return getJobAttemptBaseDirName(appAttemptId);
-  }
-
-  protected static String getJobAttemptBaseDirName(int appAttemptId) {
-    return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + 
-      + appAttemptId;
-  }
-
-  protected static String getTaskAttemptBaseDirName(
-      TaskAttemptContext context) {
-	  return getJobAttemptBaseDirName(context) + Path.SEPARATOR + 
-	  FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-      "_" + context.getTaskAttemptID().toString();
-  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Feb 10 01:49:08 2012
@@ -79,7 +79,17 @@ public class TokenCache {
     }
     obtainTokensForNamenodesInternal(credentials, ps, conf);
   }
-    
+
+  /**
+   * Remove jobtoken referrals which don't make sense in the context
+   * of the task execution.
+   *
+   * @param conf
+   */
+  public static void cleanUpTokenReferral(Configuration conf) {
+    conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+  }
+
   static void obtainTokensForNamenodesInternal(Credentials credentials,
       Path[] ps, Configuration conf) throws IOException {
     for(Path p: ps) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Fri Feb 10 01:49:08 2012
@@ -46,7 +46,6 @@ import org.apache.hadoop.mapred.RawKeyVa
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
@@ -68,7 +67,8 @@ public class MergeManager<K, V> {
   
   /* Maximum percentage of the in-memory limit that a single shuffle can 
    * consume*/ 
-  private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
+  private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
+    = 0.25f;
 
   private final TaskAttemptID reduceId;
   
@@ -169,12 +169,22 @@ public class MergeManager<K, V> {
  
     this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
 
+    final float singleShuffleMemoryLimitPercent =
+        jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
+            DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
     this.maxSingleShuffleLimit = 
-      (long)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
+      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
     this.memToMemMergeOutputsThreshold = 
             jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
     this.mergeThreshold = (long)(this.memoryLimit * 
-                          jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_EPRCENT, 
+                          jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 
                                            0.90f));
     LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
              "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java Fri Feb 10 01:49:08 2012
@@ -345,7 +345,7 @@ public class CLI extends Configured impl
         LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
         LogDumper logDumper = new LogDumper();
         logDumper.setConf(getConf());
-        logDumper.dumpAContainersLogs(logParams.getApplicationId(),
+        exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
             logParams.getContainerId(), logParams.getNodeId(),
             logParams.getOwner());
         } catch (IOException e) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Feb 10 01:49:08 2012
@@ -355,7 +355,7 @@ public class ConfigUtil {
     Configuration.addDeprecation("mapred.job.shuffle.input.buffer.percent", 
       new String[] {MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT});
     Configuration.addDeprecation("mapred.job.shuffle.merge.percent", 
-      new String[] {MRJobConfig.SHUFFLE_MERGE_EPRCENT});
+      new String[] {MRJobConfig.SHUFFLE_MERGE_PERCENT});
     Configuration.addDeprecation("mapred.max.reduce.failures.percent", 
       new String[] {MRJobConfig.REDUCE_FAILURES_MAXPERCENT});
     Configuration.addDeprecation("mapred.reduce.child.env", 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Feb 10 01:49:08 2012
@@ -518,6 +518,13 @@
 </property>
 
 <property>
+  <name>mapreduce.reduce.shuffle.memory.limit.percent</name>
+  <value>0.25</value>
+  <description>Expert: Maximum percentage of the in-memory limit that a
+  single shuffle can consume</description>
+</property>
+
+<property>
   <name>mapreduce.reduce.markreset.buffer.percent</name>
   <value>0.0</value>
   <description>The percentage of memory -relative to the maximum heap size- to
@@ -1248,4 +1255,25 @@
     heartbeats to the ResourceManager</description>
 </property>
 
+<property>
+  <name>yarn.app.mapreduce.client-am.ipc.max-retries</name>
+  <value>1</value>
+  <description>The number of client retries to the AM - before reconnecting
+    to the RM to fetch Application Status.</description>
+</property>
+
+<!-- jobhistory properties -->
+
+<property>
+  <name>mapreduce.jobhistory.address</name>
+  <value>0.0.0.0:10020</value>
+  <description>MapReduce JobHistory Server host:port</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.webapp.address</name>
+  <value>0.0.0.0:19888</value>
+  <description>MapReduce JobHistory Server Web UI host:port</description>
+</property>
+
 </configuration>

Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 10 01:49:08 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1237154
+/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1242632
 /hadoop/core/branches/branch-0.19/mapred/src/java/mapred-default.xml:713112
 /hadoop/core/trunk/src/mapred/mapred-default.xml:776175-785643

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java Fri Feb 10 01:49:08 2012
@@ -21,9 +21,13 @@ import static org.junit.Assert.assertEqu
 
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Iterator;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -35,6 +39,7 @@ import org.junit.Test;
 public class TestCounters {
   enum myCounters {TEST1, TEST2};
   private static final long MAX_VALUE = 10;
+  private static final Log LOG = LogFactory.getLog(TestCounters.class);
   
   // Generates enum based counters
   private Counters getEnumCounters(Enum[] keys) {
@@ -71,8 +76,6 @@ public class TestCounters {
     // Check for recovery from string
     assertEquals("Recovered counter does not match on content", 
                  counter, recoveredCounter);
-    assertEquals("recovered counter has wrong hash code",
-                 counter.hashCode(), recoveredCounter.hashCode());
   }
   
   @Test
@@ -132,23 +135,43 @@ public class TestCounters {
   
   @SuppressWarnings("deprecation")
   @Test
-  public void testLegacyNames() {
+  public void testReadWithLegacyNames() {
     Counters counters = new Counters();
     counters.incrCounter(TaskCounter.MAP_INPUT_RECORDS, 1);
     counters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
     counters.findCounter("file", FileSystemCounter.BYTES_READ).increment(1);
     
+    checkLegacyNames(counters);
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testWriteWithLegacyNames() {
+    Counters counters = new Counters();
+    counters.incrCounter(Task.Counter.MAP_INPUT_RECORDS, 1);
+    counters.incrCounter(JobInProgress.Counter.DATA_LOCAL_MAPS, 1);
+    counters.findCounter("FileSystemCounter", "FILE_BYTES_READ").increment(1);
+    
+    checkLegacyNames(counters);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void checkLegacyNames(Counters counters) {
     assertEquals("New name", 1, counters.findCounter(
         TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue());
     assertEquals("Legacy name", 1, counters.findCounter(
         "org.apache.hadoop.mapred.Task$Counter",
         "MAP_INPUT_RECORDS").getValue());
+    assertEquals("Legacy enum", 1,
+        counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue());
 
     assertEquals("New name", 1, counters.findCounter(
         JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue());
     assertEquals("Legacy name", 1, counters.findCounter(
         "org.apache.hadoop.mapred.JobInProgress$Counter",
         "DATA_LOCAL_MAPS").getValue());
+    assertEquals("Legacy enum", 1,
+        counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue());
 
     assertEquals("New name", 1, counters.findCounter(
         FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue());
@@ -159,6 +182,28 @@ public class TestCounters {
         "FILE_BYTES_READ").getValue());
   }
   
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testCounterIteratorConcurrency() {
+    Counters counters = new Counters();
+    counters.incrCounter("group1", "counter1", 1);
+    Iterator<Group> iterator = counters.iterator();
+    counters.incrCounter("group2", "counter2", 1);
+    iterator.next();
+  }
+  
+  
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testGroupIteratorConcurrency() {
+    Counters counters = new Counters();
+    counters.incrCounter("group1", "counter1", 1);
+    Group group = counters.getGroup("group1");
+    Iterator<Counter> iterator = group.iterator();
+    counters.incrCounter("group1", "counter2", 1);
+    iterator.next();
+  }
+  
   public static void main(String[] args) throws IOException {
     new TestCounters().testCounters();
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Feb 10 01:49:08 2012
@@ -104,11 +104,12 @@ public class TestFileOutputCommitter ext
     writeOutput(theRecordWriter, tContext);
 
     // do commit
-    committer.commitTask(tContext);
-    Path jobTempDir1 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir1.toString()).exists()));
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
+    Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+    File jtd1 = new File(jobTempDir1.toUri().getPath());
+    assertTrue(jtd1.exists());
     validateContent(jobTempDir1);        
     
     //now while running the second app attempt, 
@@ -119,14 +120,12 @@ public class TestFileOutputCommitter ext
     JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
     TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
     FileOutputCommitter committer2 = new FileOutputCommitter();
-    committer.setupJob(jContext2);
-    Path jobTempDir2 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir2.toString()).exists()));
+    committer2.setupJob(jContext2);
+    Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
     
-    tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
     committer2.recoverTask(tContext2);
+    File jtd2 = new File(jobTempDir2.toUri().getPath());
+    assertTrue(jtd2.exists());
     validateContent(jobTempDir2);
     
     committer2.commitJob(jContext2);
@@ -135,7 +134,8 @@ public class TestFileOutputCommitter ext
   }
 
   private void validateContent(Path dir) throws IOException {
-    File expectedFile = new File(new Path(dir, partFile).toString());
+    File fdir = new File(dir.toUri().getPath());
+    File expectedFile = new File(fdir, partFile);
     StringBuffer expectedOutput = new StringBuffer();
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
@@ -190,7 +190,9 @@ public class TestFileOutputCommitter ext
     writeOutput(theRecordWriter, tContext);
 
     // do commit
-    committer.commitTask(tContext);
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
     committer.commitJob(jContext);
 
     // validate output
@@ -216,7 +218,9 @@ public class TestFileOutputCommitter ext
     writeMapFileOutput(theRecordWriter, tContext);
 
     // do commit
-    committer.commitTask(tContext);
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
     committer.commitJob(jContext);
 
     // validate output
@@ -224,6 +228,28 @@ public class TestFileOutputCommitter ext
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
   
+  public void testMapOnlyNoOutput() throws Exception {
+    JobConf conf = new JobConf();
+    //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();    
+    
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+    
+    if(committer.needsTaskCommit(tContext)) {
+      // do commit
+      committer.commitTask(tContext);
+    }
+    committer.commitJob(jContext);
+
+    // validate output
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+  
   public void testAbort() throws IOException, InterruptedException {
     JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);
@@ -244,21 +270,17 @@ public class TestFileOutputCommitter ext
 
     // do abort
     committer.abortTask(tContext);
-    FileSystem outputFileSystem = outDir.getFileSystem(conf);
-    Path workPath = new Path(outDir,
-        committer.getTaskAttemptBaseDirName(tContext))
-        .makeQualified(outputFileSystem);
-    File expectedFile = new File(new Path(workPath, partFile)
-        .toString());
+    File out = new File(outDir.toUri().getPath());
+    Path workPath = committer.getWorkPath(tContext, outDir);
+    File wp = new File(workPath.toUri().getPath());
+    File expectedFile = new File(wp, partFile);
     assertFalse("task temp dir still exists", expectedFile.exists());
 
     committer.abortJob(jContext, JobStatus.State.FAILED);
-    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
-        .toString());
+    expectedFile = new File(out, FileOutputCommitter.TEMP_DIR_NAME);
     assertFalse("job temp dir still exists", expectedFile.exists());
-    assertEquals("Output directory not empty", 0, new File(outDir.toString())
-        .listFiles().length);
-    FileUtil.fullyDelete(new File(outDir.toString()));
+    assertEquals("Output directory not empty", 0, out.listFiles().length);
+    FileUtil.fullyDelete(out);
   }
 
   public static class FakeFileSystem extends RawLocalFileSystem {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java Fri Feb 10 01:49:08 2012
@@ -21,7 +21,9 @@ package org.apache.hadoop.mapreduce;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -34,6 +36,7 @@ import java.io.StringReader;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.log4j.Layout;
@@ -88,6 +91,7 @@ public class TestJobMonitorAndPrint exte
         }
         ).when(job).getTaskCompletionEvents(anyInt(), anyInt());
 
+    doReturn(new TaskReport[5]).when(job).getTaskReports(isA(TaskType.class));
     when(clientProtocol.getJobStatus(any(JobID.class))).thenReturn(jobStatus_1, jobStatus_2);
     // setup the logger to capture all logs
     Layout layout =
@@ -106,21 +110,25 @@ public class TestJobMonitorAndPrint exte
     boolean foundHundred = false;
     boolean foundComplete = false;
     boolean foundUber = false;
-    String match_1 = "uber mode : true";
-    String match_2 = "map 100% reduce 100%";
-    String match_3 = "completed successfully";
+    String uberModeMatch = "uber mode : true";
+    String progressMatch = "map 100% reduce 100%";
+    String completionMatch = "completed successfully";
     while ((line = r.readLine()) != null) {
-      if (line.contains(match_1)) {
+      if (line.contains(uberModeMatch)) {
         foundUber = true;
       }
-      foundHundred = line.contains(match_2);      
+      foundHundred = line.contains(progressMatch);      
       if (foundHundred)
         break;
     }
     line = r.readLine();
-    foundComplete = line.contains(match_3);
+    foundComplete = line.contains(completionMatch);
     assertTrue(foundUber);
     assertTrue(foundHundred);
     assertTrue(foundComplete);
+
+    System.out.println("The output of job.toString() is : \n" + job.toString());
+    assertTrue(job.toString().contains("Number of maps: 5\n"));
+    assertTrue(job.toString().contains("Number of reduces: 5\n"));
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Fri Feb 10 01:49:08 2012
@@ -60,6 +60,22 @@ public class TestFileOutputCommitter ext
   private Text val2 = new Text("val2");
 
   
+  private static void cleanup() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = outDir.getFileSystem(conf);
+    fs.delete(outDir, true);
+  }
+  
+  @Override
+  public void setUp() throws IOException {
+    cleanup();
+  }
+  
+  @Override
+  public void tearDown() throws IOException {
+    cleanup();
+  }
+  
   private void writeOutput(RecordWriter theRecordWriter,
       TaskAttemptContext context) throws IOException, InterruptedException {
     NullWritable nullWritable = NullWritable.get();
@@ -114,11 +130,10 @@ public class TestFileOutputCommitter ext
 
     // do commit
     committer.commitTask(tContext);
-    Path jobTempDir1 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir1.toString()).exists()));
-    validateContent(jobTempDir1);    
+    Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+    File jtd = new File(jobTempDir1.toUri().getPath());
+    assertTrue(jtd.exists());
+    validateContent(jtd);    
     
     //now while running the second app attempt, 
     //recover the task output from first attempt
@@ -128,15 +143,13 @@ public class TestFileOutputCommitter ext
     JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
     TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
     FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
-    committer.setupJob(tContext2);
-    Path jobTempDir2 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir2.toString()).exists()));
+    committer2.setupJob(tContext2);
+    Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
+    File jtd2 = new File(jobTempDir2.toUri().getPath());
     
-    tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
     committer2.recoverTask(tContext2);
-    validateContent(jobTempDir2);
+    assertTrue(jtd2.exists());
+    validateContent(jtd2);
     
     committer2.commitJob(jContext2);
     validateContent(outDir);
@@ -144,7 +157,12 @@ public class TestFileOutputCommitter ext
   }
 
   private void validateContent(Path dir) throws IOException {
-    File expectedFile = new File(new Path(dir, partFile).toString());
+    validateContent(new File(dir.toUri().getPath()));
+  }
+  
+  private void validateContent(File dir) throws IOException {
+    File expectedFile = new File(dir, partFile);
+    assertTrue("Could not find "+expectedFile, expectedFile.exists());
     StringBuffer expectedOutput = new StringBuffer();
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
@@ -259,7 +277,7 @@ public class TestFileOutputCommitter ext
     assertFalse("task temp dir still exists", expectedFile.exists());
 
     committer.abortJob(jContext, JobStatus.State.FAILED);
-    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+    expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME)
         .toString());
     assertFalse("job temp dir still exists", expectedFile.exists());
     assertEquals("Output directory not empty", 0, new File(outDir.toString())
@@ -315,12 +333,10 @@ public class TestFileOutputCommitter ext
     assertNotNull(th);
     assertTrue(th instanceof IOException);
     assertTrue(th.getMessage().contains("fake delete failed"));
-    File jobTmpDir = new File(new Path(outDir,
-        FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-        conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) +
-        Path.SEPARATOR +
-        FileOutputCommitter.TEMP_DIR_NAME).toString());
-    File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+    Path jtd = committer.getJobAttemptPath(jContext);
+    File jobTmpDir = new File(jtd.toUri().getPath());
+    Path ttd = committer.getTaskAttemptPath(tContext);
+    File taskTmpDir = new File(ttd.toUri().getPath());
     File expectedFile = new File(taskTmpDir, partFile);
     assertTrue(expectedFile + " does not exists", expectedFile.exists());
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Feb 10 01:49:08 2012
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapreduce.security;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -33,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -158,4 +161,13 @@ public class TestTokenCache {
 
     return mockFs;
   }
+
+  @Test
+  public void testCleanUpTokenReferral() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, "foo");
+    TokenCache.cleanUpTokenReferral(conf);
+    assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY));
+  }
+  
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Fri Feb 10 01:49:08 2012
@@ -249,8 +249,9 @@ public class CompletedJob implements org
     }
     
     if (historyFileAbsolute != null) {
+      JobHistoryParser parser = null;
       try {
-        JobHistoryParser parser =
+        parser =
             new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
                 historyFileAbsolute);
         jobInfo = parser.parse();
@@ -258,6 +259,12 @@ public class CompletedJob implements org
         throw new YarnException("Could not load history file "
             + historyFileAbsolute, e);
       }
+      IOException parseException = parser.getParseException(); 
+      if (parseException != null) {
+        throw new YarnException(
+            "Could not parse history file " + historyFileAbsolute, 
+            parseException);
+      }
     } else {
       throw new IOException("History file not found");
     }
@@ -321,9 +328,6 @@ public class CompletedJob implements org
   @Override
   public
       boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return true;
-    }
     Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
     AccessControlList jobACL = jobACLs.get(jobOperation);
     return aclsMgr.checkAccess(callerUGI, jobOperation, 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Fri Feb 10 01:49:08 2012
@@ -152,7 +152,7 @@ public class PartialJob implements org.a
 
   @Override
   public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
-    return false;
+    return true;
   }
   
   @Override