You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/02/10 02:49:30 UTC
svn commit: r1242635 [6/10] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ bin/ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java Fri Feb 10 01:49:08 2012
@@ -22,10 +22,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
-import java.util.Map;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
@@ -33,6 +31,8 @@ import org.apache.hadoop.io.WritableUtil
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.util.ResourceBundles;
+import com.google.common.collect.Iterators;
+
/**
* An abstract class to provide common implementation of the
* generic counter group in both mapred and mapreduce package.
@@ -45,7 +45,8 @@ public abstract class AbstractCounterGro
private final String name;
private String displayName;
- private final Map<String, T> counters = Maps.newTreeMap();
+ private final ConcurrentMap<String, T> counters =
+ new ConcurrentSkipListMap<String, T>();
private final Limits limits;
public AbstractCounterGroup(String name, String displayName,
@@ -56,7 +57,7 @@ public abstract class AbstractCounterGro
}
@Override
- public synchronized String getName() {
+ public String getName() {
return name;
}
@@ -79,7 +80,7 @@ public abstract class AbstractCounterGro
@Override
public synchronized T addCounter(String counterName, String displayName,
long value) {
- String saveName = limits.filterCounterName(counterName);
+ String saveName = Limits.filterCounterName(counterName);
T counter = findCounterImpl(saveName, false);
if (counter == null) {
return addCounterImpl(saveName, displayName, value);
@@ -95,8 +96,10 @@ public abstract class AbstractCounterGro
}
@Override
- public T findCounter(String counterName, String displayName) {
- String saveName = limits.filterCounterName(counterName);
+ public synchronized T findCounter(String counterName, String displayName) {
+ // Take lock to avoid two threads not finding a counter and trying to add
+ // the same counter.
+ String saveName = Limits.filterCounterName(counterName);
T counter = findCounterImpl(saveName, false);
if (counter == null) {
return addCounterImpl(saveName, displayName, 0);
@@ -105,11 +108,13 @@ public abstract class AbstractCounterGro
}
@Override
- public synchronized T findCounter(String counterName, boolean create) {
- return findCounterImpl(limits.filterCounterName(counterName), create);
+ public T findCounter(String counterName, boolean create) {
+ return findCounterImpl(Limits.filterCounterName(counterName), create);
}
- private T findCounterImpl(String counterName, boolean create) {
+ // Lock the object. Cannot simply use concurrent constructs on the counters
+ // data-structure (like putIfAbsent) because of localization, limits etc.
+ private synchronized T findCounterImpl(String counterName, boolean create) {
T counter = counters.get(counterName);
if (counter == null && create) {
String localized =
@@ -141,7 +146,7 @@ public abstract class AbstractCounterGro
protected abstract T newCounter();
@Override
- public synchronized Iterator<T> iterator() {
+ public Iterator<T> iterator() {
return counters.values().iterator();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java Fri Feb 10 01:49:08 2012
@@ -18,18 +18,18 @@
package org.apache.hadoop.mapreduce.counters;
+import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId;
+import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-
-import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
@@ -39,7 +39,10 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
-import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
/**
* An abstract class to provide common implementation for the Counters
@@ -60,8 +63,10 @@ public abstract class AbstractCounters<C
* A cache from enum values to the associated counter.
*/
private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
- private Map<String, G> fgroups = Maps.newTreeMap(); // framework & fs groups
- private Map<String, G> groups = Maps.newTreeMap(); // other groups
+ //framework & fs groups
+ private Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
+ // other groups
+ private Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
private final CounterGroupFactory<C, G> groupFactory;
// For framework counter serialization without strings
@@ -171,7 +176,8 @@ public abstract class AbstractCounters<C
@InterfaceAudience.Private
public synchronized C findCounter(String scheme, FileSystemCounter key) {
return ((FileSystemCounterGroup<C>) getGroup(
- FileSystemCounter.class.getName())).findCounter(scheme, key);
+ FileSystemCounter.class.getName()).getUnderlyingGroup()).
+ findCounter(scheme, key);
}
/**
@@ -213,7 +219,7 @@ public abstract class AbstractCounters<C
private String filterGroupName(String oldName) {
String newName = legacyMap.get(oldName);
if (newName == null) {
- return limits.filterGroupName(oldName);
+ return Limits.filterGroupName(oldName);
}
LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead");
return newName;
@@ -241,11 +247,11 @@ public abstract class AbstractCounters<C
WritableUtils.writeVInt(out, groupFactory.version());
WritableUtils.writeVInt(out, fgroups.size()); // framework groups first
for (G group : fgroups.values()) {
- if (group instanceof FrameworkCounterGroup<?, ?>) {
+ if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
group.write(out);
- } else if (group instanceof FileSystemCounterGroup<?>) {
+ } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
group.write(out);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java Fri Feb 10 01:49:08 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.counters;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
@@ -98,4 +99,11 @@ public interface CounterGroupBase<T exte
* @param rightGroup the group to be added to this group
*/
void incrAllCounters(CounterGroupBase<T> rightGroup);
+
+ @Private
+ /**
+ * Exposes the underlying group type if a facade.
+ * @return the underlying object that this object is wrapping up.
+ */
+ CounterGroupBase<T> getUnderlyingGroup();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java Fri Feb 10 01:49:08 2012
@@ -110,6 +110,11 @@ public abstract class FileSystemCounterG
public void readFields(DataInput in) throws IOException {
assert false : "shouldn't be called";
}
+
+ @Override
+ public Counter getUnderlyingCounter() {
+ return this;
+ }
}
@Override
@@ -231,10 +236,10 @@ public abstract class FileSystemCounterG
@Override
@SuppressWarnings("unchecked")
public void incrAllCounters(CounterGroupBase<C> other) {
- if (checkNotNull(other, "other group")
+ if (checkNotNull(other.getUnderlyingGroup(), "other group")
instanceof FileSystemCounterGroup<?>) {
for (Counter counter : other) {
- FSCounter c = (FSCounter) counter;
+ FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
findCounter(c.scheme, c.key) .increment(counter.getValue());
}
}
@@ -253,7 +258,7 @@ public abstract class FileSystemCounterG
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
- FSCounter c = (FSCounter) counter;
+ FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java Fri Feb 10 01:49:08 2012
@@ -18,21 +18,24 @@
package org.apache.hadoop.mapreduce.counters;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
-import static com.google.common.base.Preconditions.*;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.util.ResourceBundles;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
/**
* An abstract class to provide common implementation for the framework
* counter group in both mapred and mapreduce packages.
@@ -43,7 +46,8 @@ import org.apache.hadoop.mapreduce.util.
@InterfaceAudience.Private
public abstract class FrameworkCounterGroup<T extends Enum<T>,
C extends Counter> implements CounterGroupBase<C> {
-
+ private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
+
private final Class<T> enumClass; // for Enum.valueOf
private final Object[] counters; // local casts are OK and save a class ref
private String displayName = null;
@@ -95,6 +99,11 @@ public abstract class FrameworkCounterGr
public void readFields(DataInput in) throws IOException {
assert false : "shouldn't be called";
}
+
+ @Override
+ public Counter getUnderlyingCounter() {
+ return this;
+ }
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java Fri Feb 10 01:49:08 2012
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
/**
* A generic counter implementation
@@ -101,4 +102,9 @@ public class GenericCounter extends Abst
public synchronized void increment(long incr) {
value += incr;
}
+
+ @Override
+ public Counter getUnderlyingCounter() {
+ return this;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/Limits.java Fri Feb 10 01:49:08 2012
@@ -42,11 +42,11 @@ public class Limits {
return name.length() > maxLen ? name.substring(0, maxLen - 1) : name;
}
- public String filterCounterName(String name) {
+ public static String filterCounterName(String name) {
return filterName(name, COUNTER_NAME_MAX);
}
- public String filterGroupName(String name) {
+ public static String filterGroupName(String name) {
return filterName(name, GROUP_NAME_MAX);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Feb 10 01:49:08 2012
@@ -24,8 +24,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,9 +55,13 @@ import org.apache.hadoop.yarn.api.record
@InterfaceStability.Unstable
public class JobHistoryParser {
+ private static final Log LOG = LogFactory.getLog(JobHistoryParser.class);
+
private final FSDataInputStream in;
- JobInfo info = null;
+ private JobInfo info = null;
+ private IOException parseException = null;
+
/**
* Create a job history parser for the given history file using the
* given file system
@@ -91,30 +98,58 @@ public class JobHistoryParser {
* The first invocation will populate the object, subsequent calls
* will return the already parsed object.
* The input stream is closed on return
+ *
+ * This api ignores partial records and stops parsing on encountering one.
+ * {@link #getParseException()} can be used to fetch the exception, if any.
+ *
* @return The populated jobInfo object
* @throws IOException
+ * @see #getParseException()
*/
public synchronized JobInfo parse() throws IOException {
+ return parse(new EventReader(in));
+ }
+
+ /**
+ * Only used for unit tests.
+ */
+ @Private
+ public synchronized JobInfo parse(EventReader reader) throws IOException {
if (info != null) {
return info;
}
- EventReader reader = new EventReader(in);
+ info = new JobInfo();
+ int eventCtr = 0;
HistoryEvent event;
- info = new JobInfo();
try {
while ((event = reader.getNextEvent()) != null) {
handleEvent(event);
- }
+ ++eventCtr;
+ }
+ } catch (IOException ioe) {
+ LOG.info("Caught exception parsing history file after " + eventCtr +
+ " events", ioe);
+ parseException = ioe;
} finally {
in.close();
}
return info;
}
- private void handleEvent(HistoryEvent event) throws IOException {
+ /**
+ * Get the parse exception, if any.
+ *
+ * @return the parse exception, if any
+ * @see #parse()
+ */
+ public synchronized IOException getParseException() {
+ return parseException;
+ }
+
+ private void handleEvent(HistoryEvent event) {
EventType type = event.getEventType();
switch (type) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Fri Feb 10 01:49:08 2012
@@ -19,16 +19,16 @@
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
-import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -37,41 +37,239 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.TaskAttemptID;
/** An {@link OutputCommitter} that commits files specified
- * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
+ * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
**/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileOutputCommitter extends OutputCommitter {
-
private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
- /**
- * Temporary directory name
+ /**
+ * Name of directory where pending data is placed. Data that has not been
+ * committed yet.
*/
- protected static final String TEMP_DIR_NAME = "_temporary";
+ public static final String PENDING_DIR_NAME = "_temporary";
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
- static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
- private FileSystem outputFileSystem = null;
private Path outputPath = null;
private Path workPath = null;
/**
* Create a file output committer
- * @param outputPath the job's output path
+ * @param outputPath the job's output path, or null if you want the output
+ * committer to act as a noop.
* @param context the task's context
* @throws IOException
*/
public FileOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
+ this(outputPath, (JobContext)context);
+ if (outputPath != null) {
+ workPath = getTaskAttemptPath(context, outputPath);
+ }
+ }
+
+ /**
+ * Create a file output committer
+ * @param outputPath the job's output path, or null if you want the output
+ * committer to act as a noop.
+ * @param context the task's context
+ * @throws IOException
+ */
+ @Private
+ public FileOutputCommitter(Path outputPath,
+ JobContext context) throws IOException {
if (outputPath != null) {
- this.outputPath = outputPath;
- outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
- workPath = new Path(outputPath,
- getTaskAttemptBaseDirName(context))
- .makeQualified(outputFileSystem);
+ FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+ this.outputPath = fs.makeQualified(outputPath);
+ }
+ }
+
+ /**
+ * @return the path where final output of the job should be placed. This
+ * could also be considered the committed application attempt path.
+ */
+ private Path getOutputPath() {
+ return this.outputPath;
+ }
+
+ /**
+ * @return true if we have an output path set, else false.
+ */
+ private boolean hasOutputPath() {
+ return this.outputPath != null;
+ }
+
+ /**
+ * @return the path where the output of pending job attempts are
+ * stored.
+ */
+ private Path getPendingJobAttemptsPath() {
+ return getPendingJobAttemptsPath(getOutputPath());
+ }
+
+ /**
+ * Get the location of pending job attempts.
+ * @param out the base output directory.
+ * @return the location of pending job attempts.
+ */
+ private static Path getPendingJobAttemptsPath(Path out) {
+ return new Path(out, PENDING_DIR_NAME);
+ }
+
+ /**
+ * Get the Application Attempt Id for this job
+ * @param context the context to look in
+ * @return the Application Attempt Id for a given job.
+ */
+ private static int getAppAttemptId(JobContext context) {
+ return context.getConfiguration().getInt(
+ MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param context the context of the job. This is used to get the
+ * application attempt id.
+ * @return the path to store job attempt data.
+ */
+ public Path getJobAttemptPath(JobContext context) {
+ return getJobAttemptPath(context, getOutputPath());
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param context the context of the job. This is used to get the
+ * application attempt id.
+ * @param out the output path to place these in.
+ * @return the path to store job attempt data.
+ */
+ public static Path getJobAttemptPath(JobContext context, Path out) {
+ return getJobAttemptPath(getAppAttemptId(context), out);
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param appAttemptId the ID of the application attempt for this job.
+ * @return the path to store job attempt data.
+ */
+ private Path getJobAttemptPath(int appAttemptId) {
+ return getJobAttemptPath(appAttemptId, getOutputPath());
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param appAttemptId the ID of the application attempt for this job.
+ * @return the path to store job attempt data.
+ */
+ private static Path getJobAttemptPath(int appAttemptId, Path out) {
+ return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
+ }
+
+ /**
+ * Compute the path where the output of pending task attempts are stored.
+ * @param context the context of the job with pending tasks.
+ * @return the path where the output of pending task attempts are stored.
+ */
+ private Path getPendingTaskAttemptsPath(JobContext context) {
+ return getPendingTaskAttemptsPath(context, getOutputPath());
+ }
+
+ /**
+ * Compute the path where the output of pending task attempts are stored.
+ * @param context the context of the job with pending tasks.
+ * @return the path where the output of pending task attempts are stored.
+ */
+ private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
+ return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
+ }
+
+ /**
+ * Compute the path where the output of a task attempt is stored until
+ * that task is committed.
+ *
+ * @param context the context of the task attempt.
+ * @return the path where a task attempt should be stored.
+ */
+ public Path getTaskAttemptPath(TaskAttemptContext context) {
+ return new Path(getPendingTaskAttemptsPath(context),
+ String.valueOf(context.getTaskAttemptID()));
+ }
+
+ /**
+ * Compute the path where the output of a task attempt is stored until
+ * that task is committed.
+ *
+ * @param context the context of the task attempt.
+ * @param out The output path to put things in.
+ * @return the path where a task attempt should be stored.
+ */
+ public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
+ return new Path(getPendingTaskAttemptsPath(context, out),
+ String.valueOf(context.getTaskAttemptID()));
+ }
+
+ /**
+ * Compute the path where the output of a committed task is stored until
+ * the entire job is committed.
+ * @param context the context of the task attempt
+ * @return the path where the output of a committed task is stored until
+ * the entire job is committed.
+ */
+ public Path getCommittedTaskPath(TaskAttemptContext context) {
+ return getCommittedTaskPath(getAppAttemptId(context), context);
+ }
+
+ public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
+ return getCommittedTaskPath(getAppAttemptId(context), context, out);
+ }
+
+ /**
+ * Compute the path where the output of a committed task is stored until the
+ * entire job is committed for a specific application attempt.
+ * @param appAttemptId the id of the application attempt to use
+ * @param context the context of any task.
+ * @return the path where the output of a committed task is stored.
+ */
+ private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
+ return new Path(getJobAttemptPath(appAttemptId),
+ String.valueOf(context.getTaskAttemptID().getTaskID()));
+ }
+
+ private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
+ return new Path(getJobAttemptPath(appAttemptId, out),
+ String.valueOf(context.getTaskAttemptID().getTaskID()));
+ }
+
+ private static class CommittedTaskFilter implements PathFilter {
+ @Override
+ public boolean accept(Path path) {
+ return !PENDING_DIR_NAME.equals(path.getName());
}
}
+
+ /**
+ * Get a list of all paths where output from committed tasks are stored.
+ * @param context the context of the current job
+ * @return the list of these Paths/FileStatuses.
+ * @throws IOException
+ */
+ private FileStatus[] getAllCommittedTaskPaths(JobContext context)
+ throws IOException {
+ Path jobAttemptPath = getJobAttemptPath(context);
+ FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
+ return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
+ }
+
+ /**
+ * Get the directory that the task should write results into.
+ * @return the work directory
+ * @throws IOException
+ */
+ public Path getWorkPath() throws IOException {
+ return workPath;
+ }
/**
* Create the temporary directory that is the root of all of the task
@@ -79,116 +277,103 @@ public class FileOutputCommitter extends
* @param context the job's context
*/
public void setupJob(JobContext context) throws IOException {
- if (outputPath != null) {
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (!fileSys.mkdirs(tmpDir)) {
- LOG.error("Mkdirs failed to create " + tmpDir.toString());
+ if (hasOutputPath()) {
+ Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+ FileSystem fs = pendingJobAttemptsPath.getFileSystem(
+ context.getConfiguration());
+ if (!fs.mkdirs(pendingJobAttemptsPath)) {
+ LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath);
}
- }
- }
-
- // True if the job requires output.dir marked on successful job.
- // Note that by default it is set to true.
- private boolean shouldMarkOutputDir(Configuration conf) {
- return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
- }
-
- // Create a _success file in the job's output dir
- private void markOutputDirSuccessful(MRJobConfig context) throws IOException {
- if (outputPath != null) {
- // create a file in the output folder to mark the job completion
- Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
- outputFileSystem.create(filePath).close();
+ } else {
+ LOG.warn("Output Path is null in setupJob()");
}
}
/**
- * Move all job output to the final place.
+ * The job has completed so move all committed tasks to the final output dir.
* Delete the temporary directory, including all of the work directories.
* Create a _SUCCESS file to make it as successful.
* @param context the job's context
*/
public void commitJob(JobContext context) throws IOException {
- if (outputPath != null) {
- //delete the task temp directory from the current jobtempdir
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- } else {
- LOG.warn("Task temp dir could not be deleted " + tmpDir);
+ if (hasOutputPath()) {
+ Path finalOutput = getOutputPath();
+ FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
+ for(FileStatus stat: getAllCommittedTaskPaths(context)) {
+ mergePaths(fs, stat, finalOutput);
}
- //move the job output to final place
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
-
// delete the _temporary folder and create a _done file in the o/p folder
cleanupJob(context);
- if (shouldMarkOutputDir(context.getConfiguration())) {
- markOutputDirSuccessful(context);
+ // True if the job requires output.dir marked on successful job.
+ // Note that by default it is set to true.
+ if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
+ Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+ fs.create(markerPath).close();
}
+ } else {
+ LOG.warn("Output Path is null in commitJob()");
}
}
/**
- * Move job output to final location
- * @param fs Filesystem handle
- * @param origJobOutputPath The original location of the job output
- * Required to generate the relative path for correct moving of data.
- * @param finalOutputDir The final output directory to which the job output
- * needs to be moved
- * @param jobOutput The current job output directory being moved
- * @throws IOException
- */
- private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
- Path finalOutputDir, Path jobOutput) throws IOException {
- LOG.debug("Told to move job output from " + jobOutput
- + " to " + finalOutputDir +
- " and orig job output path is " + origJobOutputPath);
- if (fs.isFile(jobOutput)) {
- Path finalOutputPath =
- getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
- if (!fs.rename(jobOutput, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Failed to delete earlier output of job");
- }
- if (!fs.rename(jobOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of job");
- }
- }
- LOG.debug("Moved job output file from " + jobOutput + " to " +
- finalOutputPath);
- } else if (fs.getFileStatus(jobOutput).isDirectory()) {
- LOG.debug("Job output file " + jobOutput + " is a dir");
- FileStatus[] paths = fs.listStatus(jobOutput);
- Path finalOutputPath =
- getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
- fs.mkdirs(finalOutputPath);
- LOG.debug("Creating dirs along job output path " + finalOutputPath);
- if (paths != null) {
- for (FileStatus path : paths) {
- moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
- }
- }
- }
+ * Merge two paths together. Anything in from will be moved into to, if there
+ * are any name conflicts while merging the files or directories in from win.
+ * @param fs the File System to use
+ * @param from the path data is coming from.
+ * @param to the path data is going to.
+ * @throws IOException on any error
+ */
+ private static void mergePaths(FileSystem fs, final FileStatus from,
+ final Path to)
+ throws IOException {
+ LOG.debug("Merging data from "+from+" to "+to);
+ if(from.isFile()) {
+ if(fs.exists(to)) {
+ if(!fs.delete(to, true)) {
+ throw new IOException("Failed to delete "+to);
+ }
+ }
+
+ if(!fs.rename(from.getPath(), to)) {
+ throw new IOException("Failed to rename "+from+" to "+to);
+ }
+ } else if(from.isDirectory()) {
+ if(fs.exists(to)) {
+ FileStatus toStat = fs.getFileStatus(to);
+ if(!toStat.isDirectory()) {
+ if(!fs.delete(to, true)) {
+ throw new IOException("Failed to delete "+to);
+ }
+ if(!fs.rename(from.getPath(), to)) {
+ throw new IOException("Failed to rename "+from+" to "+to);
+ }
+ } else {
+ //It is a directory so merge everything in the directories
+ for(FileStatus subFrom: fs.listStatus(from.getPath())) {
+ Path subTo = new Path(to, subFrom.getPath().getName());
+ mergePaths(fs, subFrom, subTo);
+ }
+ }
+ } else {
+ //it does not exist just rename
+ if(!fs.rename(from.getPath(), to)) {
+ throw new IOException("Failed to rename "+from+" to "+to);
+ }
+ }
+ }
}
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
- if (outputPath != null) {
- Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- }
+ if (hasOutputPath()) {
+ Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+ FileSystem fs = pendingJobAttemptsPath
+ .getFileSystem(context.getConfiguration());
+ fs.delete(pendingJobAttemptsPath, true);
} else {
- LOG.warn("Output Path is null in cleanup");
+ LOG.warn("Output Path is null in cleanupJob()");
}
}
@@ -217,69 +402,40 @@ public class FileOutputCommitter extends
* Move the files from the work directory to the job output directory
* @param context the task context
*/
+ @Override
public void commitTask(TaskAttemptContext context)
throws IOException {
- TaskAttemptID attemptId = context.getTaskAttemptID();
- if (workPath != null) {
- context.progress();
- if (outputFileSystem.exists(workPath)) {
- // Move the task outputs to the current job attempt output dir
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
- // Delete the temporary task-specific output directory
- if (!outputFileSystem.delete(workPath, true)) {
- LOG.warn("Failed to delete the temporary output" +
- " directory of task: " + attemptId + " - " + workPath);
- }
- LOG.info("Saved output of task '" + attemptId + "' to " +
- jobOutputPath);
- }
- }
+ commitTask(context, null);
}
- /**
- * Move all of the files from the work directory to the final output
- * @param context the task context
- * @param fs the output file system
- * @param jobOutputDir the final output direcotry
- * @param taskOutput the work path
- * @throws IOException
- */
- private void moveTaskOutputs(TaskAttemptContext context,
- FileSystem fs,
- Path jobOutputDir,
- Path taskOutput)
+ @Private
+ public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
- context.progress();
- LOG.debug("Told to move taskoutput from " + taskOutput
- + " to " + jobOutputDir);
- if (fs.isFile(taskOutput)) {
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
- workPath);
- if (!fs.rename(taskOutput, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Failed to delete earlier output of task: " +
- attemptId);
- }
- if (!fs.rename(taskOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of task: " +
- attemptId);
- }
+ if (hasOutputPath()) {
+ context.progress();
+ if(taskAttemptPath == null) {
+ taskAttemptPath = getTaskAttemptPath(context);
}
- LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
- } else if(fs.getFileStatus(taskOutput).isDirectory()) {
- LOG.debug("Taskoutput " + taskOutput + " is a dir");
- FileStatus[] paths = fs.listStatus(taskOutput);
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
- fs.mkdirs(finalOutputPath);
- LOG.debug("Creating dirs along path " + finalOutputPath);
- if (paths != null) {
- for (FileStatus path : paths) {
- moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
+ Path committedTaskPath = getCommittedTaskPath(context);
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+ if (fs.exists(taskAttemptPath)) {
+ if(fs.exists(committedTaskPath)) {
+ if(!fs.delete(committedTaskPath, true)) {
+ throw new IOException("Could not delete " + committedTaskPath);
+ }
+ }
+ if(!fs.rename(taskAttemptPath, committedTaskPath)) {
+ throw new IOException("Could not rename " + taskAttemptPath + " to "
+ + committedTaskPath);
}
+ LOG.info("Saved output of task '" + attemptId + "' to " +
+ committedTaskPath);
+ } else {
+ LOG.warn("No Output found for " + attemptId);
}
+ } else {
+ LOG.warn("Output Path is null in commitTask()");
}
}
@@ -289,38 +445,22 @@ public class FileOutputCommitter extends
*/
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
- if (workPath != null) {
- context.progress();
- outputFileSystem.delete(workPath, true);
- }
+ abortTask(context, null);
}
- /**
- * Find the final name of a given output file, given the job output directory
- * and the work directory.
- * @param jobOutputDir the job's output directory
- * @param taskOutput the specific task output file
- * @param taskOutputPath the job's work directory
- * @return the final path for the specific output file
- * @throws IOException
- */
- private Path getFinalPath(Path jobOutputDir, Path taskOutput,
- Path taskOutputPath) throws IOException {
- URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(),
- outputFileSystem.getWorkingDirectory()).toUri();
- URI taskOutputPathUri =
- taskOutputPath.makeQualified(
- outputFileSystem.getUri(),
- outputFileSystem.getWorkingDirectory()).toUri();
- URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
- if (taskOutputUri == relativePath) {
- throw new IOException("Can not get the relative path: base = " +
- taskOutputPathUri + " child = " + taskOutputUri);
- }
- if (relativePath.getPath().length() > 0) {
- return new Path(jobOutputDir, relativePath.getPath());
+ @Private
+ public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
+ if (hasOutputPath()) {
+ context.progress();
+ if(taskAttemptPath == null) {
+ taskAttemptPath = getTaskAttemptPath(context);
+ }
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+ if(!fs.delete(taskAttemptPath, true)) {
+ LOG.warn("Could not delete "+taskAttemptPath);
+ }
} else {
- return jobOutputDir;
+ LOG.warn("Output Path is null in abortTask()");
}
}
@@ -331,16 +471,20 @@ public class FileOutputCommitter extends
@Override
public boolean needsTaskCommit(TaskAttemptContext context
) throws IOException {
- return workPath != null && outputFileSystem.exists(workPath);
+ return needsTaskCommit(context, null);
}
- /**
- * Get the directory that the task should write results into
- * @return the work directory
- * @throws IOException
- */
- public Path getWorkPath() throws IOException {
- return workPath;
+ @Private
+ public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
+ ) throws IOException {
+ if(hasOutputPath()) {
+ if(taskAttemptPath == null) {
+ taskAttemptPath = getTaskAttemptPath(context);
+ }
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+ return fs.exists(taskAttemptPath);
+ }
+ return false;
}
@Override
@@ -351,44 +495,40 @@ public class FileOutputCommitter extends
@Override
public void recoverTask(TaskAttemptContext context)
throws IOException {
- context.progress();
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- int previousAttempt =
- context.getConfiguration().getInt(
- MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
- if (previousAttempt < 0) {
- throw new IOException ("Cannot recover task output for first attempt...");
- }
+ if(hasOutputPath()) {
+ context.progress();
+ TaskAttemptID attemptId = context.getTaskAttemptID();
+ int previousAttempt = getAppAttemptId(context) - 1;
+ if (previousAttempt < 0) {
+ throw new IOException ("Cannot recover task output for first attempt...");
+ }
- Path pathToRecover =
- new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
- LOG.debug("Trying to recover task from " + pathToRecover
- + " into " + jobOutputPath);
- if (outputFileSystem.exists(pathToRecover)) {
- // Move the task outputs to their final place
- moveJobOutputs(outputFileSystem,
- pathToRecover, jobOutputPath, pathToRecover);
- LOG.info("Saved output of job to " + jobOutputPath);
+ Path committedTaskPath = getCommittedTaskPath(context);
+ Path previousCommittedTaskPath = getCommittedTaskPath(
+ previousAttempt, context);
+ FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+
+ LOG.debug("Trying to recover task from " + previousCommittedTaskPath
+ + " into " + committedTaskPath);
+ if (fs.exists(previousCommittedTaskPath)) {
+ if(fs.exists(committedTaskPath)) {
+ if(!fs.delete(committedTaskPath, true)) {
+ throw new IOException("Could not delete "+committedTaskPath);
+ }
+ }
+ //Rename can fail if the parent directory does not yet exist.
+ Path committedParent = committedTaskPath.getParent();
+ fs.mkdirs(committedParent);
+ if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+ throw new IOException("Could not rename " + previousCommittedTaskPath +
+ " to " + committedTaskPath);
+ }
+ LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
+ } else {
+ LOG.warn(attemptId+" had no output to recover.");
+ }
+ } else {
+ LOG.warn("Output Path is null in recoverTask()");
}
}
-
- protected static String getJobAttemptBaseDirName(JobContext context) {
- int appAttemptId =
- context.getConfiguration().getInt(
- MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
- return getJobAttemptBaseDirName(appAttemptId);
- }
-
- protected static String getJobAttemptBaseDirName(int appAttemptId) {
- return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- + appAttemptId;
- }
-
- protected static String getTaskAttemptBaseDirName(
- TaskAttemptContext context) {
- return getJobAttemptBaseDirName(context) + Path.SEPARATOR +
- FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- "_" + context.getTaskAttemptID().toString();
- }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Feb 10 01:49:08 2012
@@ -79,7 +79,17 @@ public class TokenCache {
}
obtainTokensForNamenodesInternal(credentials, ps, conf);
}
-
+
+ /**
+ * Remove jobtoken referrals which don't make sense in the context
+ * of the task execution.
+ *
+ * @param conf
+ */
+ public static void cleanUpTokenReferral(Configuration conf) {
+ conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+ }
+
static void obtainTokensForNamenodesInternal(Credentials credentials,
Path[] ps, Configuration conf) throws IOException {
for(Path p: ps) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Fri Feb 10 01:49:08 2012
@@ -46,7 +46,6 @@ import org.apache.hadoop.mapred.RawKeyVa
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
@@ -68,7 +67,8 @@ public class MergeManager<K, V> {
/* Maximum percentage of the in-memory limit that a single shuffle can
* consume*/
- private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
+ private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
+ = 0.25f;
private final TaskAttemptID reduceId;
@@ -169,12 +169,22 @@ public class MergeManager<K, V> {
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
+ final float singleShuffleMemoryLimitPercent =
+ jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
+ DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ if (singleShuffleMemoryLimitPercent <= 0.0f
+ || singleShuffleMemoryLimitPercent > 1.0f) {
+ throw new IllegalArgumentException("Invalid value for "
+ + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ + singleShuffleMemoryLimitPercent);
+ }
+
this.maxSingleShuffleLimit =
- (long)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
+ (long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold =
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
this.mergeThreshold = (long)(this.memoryLimit *
- jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_EPRCENT,
+ jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
0.90f));
LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java Fri Feb 10 01:49:08 2012
@@ -345,7 +345,7 @@ public class CLI extends Configured impl
LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
LogDumper logDumper = new LogDumper();
logDumper.setConf(getConf());
- logDumper.dumpAContainersLogs(logParams.getApplicationId(),
+ exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
logParams.getContainerId(), logParams.getNodeId(),
logParams.getOwner());
} catch (IOException e) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Feb 10 01:49:08 2012
@@ -355,7 +355,7 @@ public class ConfigUtil {
Configuration.addDeprecation("mapred.job.shuffle.input.buffer.percent",
new String[] {MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT});
Configuration.addDeprecation("mapred.job.shuffle.merge.percent",
- new String[] {MRJobConfig.SHUFFLE_MERGE_EPRCENT});
+ new String[] {MRJobConfig.SHUFFLE_MERGE_PERCENT});
Configuration.addDeprecation("mapred.max.reduce.failures.percent",
new String[] {MRJobConfig.REDUCE_FAILURES_MAXPERCENT});
Configuration.addDeprecation("mapred.reduce.child.env",
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Feb 10 01:49:08 2012
@@ -518,6 +518,13 @@
</property>
<property>
+ <name>mapreduce.reduce.shuffle.memory.limit.percent</name>
+ <value>0.25</value>
+ <description>Expert: Maximum percentage of the in-memory limit that a
+ single shuffle can consume</description>
+</property>
+
+<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>
<description>The percentage of memory -relative to the maximum heap size- to
@@ -1248,4 +1255,25 @@
heartbeats to the ResourceManager</description>
</property>
+<property>
+ <name>yarn.app.mapreduce.client-am.ipc.max-retries</name>
+ <value>1</value>
+ <description>The number of client retries to the AM - before reconnecting
+ to the RM to fetch Application Status.</description>
+</property>
+
+<!-- jobhistory properties -->
+
+<property>
+ <name>mapreduce.jobhistory.address</name>
+ <value>0.0.0.0:10020</value>
+ <description>MapReduce JobHistory Server host:port</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.webapp.address</name>
+ <value>0.0.0.0:19888</value>
+ <description>MapReduce JobHistory Server Web UI host:port</description>
+</property>
+
</configuration>
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 10 01:49:08 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1237154
+/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1242632
/hadoop/core/branches/branch-0.19/mapred/src/java/mapred-default.xml:713112
/hadoop/core/trunk/src/mapred/mapred-default.xml:776175-785643
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java Fri Feb 10 01:49:08 2012
@@ -21,9 +21,13 @@ import static org.junit.Assert.assertEqu
import java.io.IOException;
import java.text.ParseException;
+import java.util.Iterator;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
@@ -35,6 +39,7 @@ import org.junit.Test;
public class TestCounters {
enum myCounters {TEST1, TEST2};
private static final long MAX_VALUE = 10;
+ private static final Log LOG = LogFactory.getLog(TestCounters.class);
// Generates enum based counters
private Counters getEnumCounters(Enum[] keys) {
@@ -71,8 +76,6 @@ public class TestCounters {
// Check for recovery from string
assertEquals("Recovered counter does not match on content",
counter, recoveredCounter);
- assertEquals("recovered counter has wrong hash code",
- counter.hashCode(), recoveredCounter.hashCode());
}
@Test
@@ -132,23 +135,43 @@ public class TestCounters {
@SuppressWarnings("deprecation")
@Test
- public void testLegacyNames() {
+ public void testReadWithLegacyNames() {
Counters counters = new Counters();
counters.incrCounter(TaskCounter.MAP_INPUT_RECORDS, 1);
counters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
counters.findCounter("file", FileSystemCounter.BYTES_READ).increment(1);
+ checkLegacyNames(counters);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testWriteWithLegacyNames() {
+ Counters counters = new Counters();
+ counters.incrCounter(Task.Counter.MAP_INPUT_RECORDS, 1);
+ counters.incrCounter(JobInProgress.Counter.DATA_LOCAL_MAPS, 1);
+ counters.findCounter("FileSystemCounter", "FILE_BYTES_READ").increment(1);
+
+ checkLegacyNames(counters);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void checkLegacyNames(Counters counters) {
assertEquals("New name", 1, counters.findCounter(
TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
+ assertEquals("Legacy enum", 1,
+ counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue());
assertEquals("New name", 1, counters.findCounter(
JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.JobInProgress$Counter",
"DATA_LOCAL_MAPS").getValue());
+ assertEquals("Legacy enum", 1,
+ counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue());
assertEquals("New name", 1, counters.findCounter(
FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue());
@@ -159,6 +182,28 @@ public class TestCounters {
"FILE_BYTES_READ").getValue());
}
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testCounterIteratorConcurrency() {
+ Counters counters = new Counters();
+ counters.incrCounter("group1", "counter1", 1);
+ Iterator<Group> iterator = counters.iterator();
+ counters.incrCounter("group2", "counter2", 1);
+ iterator.next();
+ }
+
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testGroupIteratorConcurrency() {
+ Counters counters = new Counters();
+ counters.incrCounter("group1", "counter1", 1);
+ Group group = counters.getGroup("group1");
+ Iterator<Counter> iterator = group.iterator();
+ counters.incrCounter("group1", "counter2", 1);
+ iterator.next();
+ }
+
public static void main(String[] args) throws IOException {
new TestCounters().testCounters();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Feb 10 01:49:08 2012
@@ -104,11 +104,12 @@ public class TestFileOutputCommitter ext
writeOutput(theRecordWriter, tContext);
// do commit
- committer.commitTask(tContext);
- Path jobTempDir1 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir1.toString()).exists()));
+ if(committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
+ Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+ File jtd1 = new File(jobTempDir1.toUri().getPath());
+ assertTrue(jtd1.exists());
validateContent(jobTempDir1);
//now while running the second app attempt,
@@ -119,14 +120,12 @@ public class TestFileOutputCommitter ext
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter();
- committer.setupJob(jContext2);
- Path jobTempDir2 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir2.toString()).exists()));
+ committer2.setupJob(jContext2);
+ Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
- tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
committer2.recoverTask(tContext2);
+ File jtd2 = new File(jobTempDir2.toUri().getPath());
+ assertTrue(jtd2.exists());
validateContent(jobTempDir2);
committer2.commitJob(jContext2);
@@ -135,7 +134,8 @@ public class TestFileOutputCommitter ext
}
private void validateContent(Path dir) throws IOException {
- File expectedFile = new File(new Path(dir, partFile).toString());
+ File fdir = new File(dir.toUri().getPath());
+ File expectedFile = new File(fdir, partFile);
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
@@ -190,7 +190,9 @@ public class TestFileOutputCommitter ext
writeOutput(theRecordWriter, tContext);
// do commit
- committer.commitTask(tContext);
+ if(committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
committer.commitJob(jContext);
// validate output
@@ -216,7 +218,9 @@ public class TestFileOutputCommitter ext
writeMapFileOutput(theRecordWriter, tContext);
// do commit
- committer.commitTask(tContext);
+ if(committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
committer.commitJob(jContext);
// validate output
@@ -224,6 +228,28 @@ public class TestFileOutputCommitter ext
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testMapOnlyNoOutput() throws Exception {
+ JobConf conf = new JobConf();
+ //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter();
+
+ // setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ if(committer.needsTaskCommit(tContext)) {
+ // do commit
+ committer.commitTask(tContext);
+ }
+ committer.commitJob(jContext);
+
+ // validate output
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
public void testAbort() throws IOException, InterruptedException {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
@@ -244,21 +270,17 @@ public class TestFileOutputCommitter ext
// do abort
committer.abortTask(tContext);
- FileSystem outputFileSystem = outDir.getFileSystem(conf);
- Path workPath = new Path(outDir,
- committer.getTaskAttemptBaseDirName(tContext))
- .makeQualified(outputFileSystem);
- File expectedFile = new File(new Path(workPath, partFile)
- .toString());
+ File out = new File(outDir.toUri().getPath());
+ Path workPath = committer.getWorkPath(tContext, outDir);
+ File wp = new File(workPath.toUri().getPath());
+ File expectedFile = new File(wp, partFile);
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
- expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
- .toString());
+ expectedFile = new File(out, FileOutputCommitter.TEMP_DIR_NAME);
assertFalse("job temp dir still exists", expectedFile.exists());
- assertEquals("Output directory not empty", 0, new File(outDir.toString())
- .listFiles().length);
- FileUtil.fullyDelete(new File(outDir.toString()));
+ assertEquals("Output directory not empty", 0, out.listFiles().length);
+ FileUtil.fullyDelete(out);
}
public static class FakeFileSystem extends RawLocalFileSystem {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java Fri Feb 10 01:49:08 2012
@@ -21,7 +21,9 @@ package org.apache.hadoop.mapreduce;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -34,6 +36,7 @@ import java.io.StringReader;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.log4j.Layout;
@@ -88,6 +91,7 @@ public class TestJobMonitorAndPrint exte
}
).when(job).getTaskCompletionEvents(anyInt(), anyInt());
+ doReturn(new TaskReport[5]).when(job).getTaskReports(isA(TaskType.class));
when(clientProtocol.getJobStatus(any(JobID.class))).thenReturn(jobStatus_1, jobStatus_2);
// setup the logger to capture all logs
Layout layout =
@@ -106,21 +110,25 @@ public class TestJobMonitorAndPrint exte
boolean foundHundred = false;
boolean foundComplete = false;
boolean foundUber = false;
- String match_1 = "uber mode : true";
- String match_2 = "map 100% reduce 100%";
- String match_3 = "completed successfully";
+ String uberModeMatch = "uber mode : true";
+ String progressMatch = "map 100% reduce 100%";
+ String completionMatch = "completed successfully";
while ((line = r.readLine()) != null) {
- if (line.contains(match_1)) {
+ if (line.contains(uberModeMatch)) {
foundUber = true;
}
- foundHundred = line.contains(match_2);
+ foundHundred = line.contains(progressMatch);
if (foundHundred)
break;
}
line = r.readLine();
- foundComplete = line.contains(match_3);
+ foundComplete = line.contains(completionMatch);
assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
+
+ System.out.println("The output of job.toString() is : \n" + job.toString());
+ assertTrue(job.toString().contains("Number of maps: 5\n"));
+ assertTrue(job.toString().contains("Number of reduces: 5\n"));
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Fri Feb 10 01:49:08 2012
@@ -60,6 +60,22 @@ public class TestFileOutputCommitter ext
private Text val2 = new Text("val2");
+ private static void cleanup() throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = outDir.getFileSystem(conf);
+ fs.delete(outDir, true);
+ }
+
+ @Override
+ public void setUp() throws IOException {
+ cleanup();
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ cleanup();
+ }
+
private void writeOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get();
@@ -114,11 +130,10 @@ public class TestFileOutputCommitter ext
// do commit
committer.commitTask(tContext);
- Path jobTempDir1 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir1.toString()).exists()));
- validateContent(jobTempDir1);
+ Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+ File jtd = new File(jobTempDir1.toUri().getPath());
+ assertTrue(jtd.exists());
+ validateContent(jtd);
//now while running the second app attempt,
//recover the task output from first attempt
@@ -128,15 +143,13 @@ public class TestFileOutputCommitter ext
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
- committer.setupJob(tContext2);
- Path jobTempDir2 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir2.toString()).exists()));
+ committer2.setupJob(tContext2);
+ Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
+ File jtd2 = new File(jobTempDir2.toUri().getPath());
- tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
committer2.recoverTask(tContext2);
- validateContent(jobTempDir2);
+ assertTrue(jtd2.exists());
+ validateContent(jtd2);
committer2.commitJob(jContext2);
validateContent(outDir);
@@ -144,7 +157,12 @@ public class TestFileOutputCommitter ext
}
private void validateContent(Path dir) throws IOException {
- File expectedFile = new File(new Path(dir, partFile).toString());
+ validateContent(new File(dir.toUri().getPath()));
+ }
+
+ private void validateContent(File dir) throws IOException {
+ File expectedFile = new File(dir, partFile);
+ assertTrue("Could not find "+expectedFile, expectedFile.exists());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
@@ -259,7 +277,7 @@ public class TestFileOutputCommitter ext
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
- expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+ expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME)
.toString());
assertFalse("job temp dir still exists", expectedFile.exists());
assertEquals("Output directory not empty", 0, new File(outDir.toString())
@@ -315,12 +333,10 @@ public class TestFileOutputCommitter ext
assertNotNull(th);
assertTrue(th instanceof IOException);
assertTrue(th.getMessage().contains("fake delete failed"));
- File jobTmpDir = new File(new Path(outDir,
- FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) +
- Path.SEPARATOR +
- FileOutputCommitter.TEMP_DIR_NAME).toString());
- File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+ Path jtd = committer.getJobAttemptPath(jContext);
+ File jobTmpDir = new File(jtd.toUri().getPath());
+ Path ttd = committer.getTaskAttemptPath(tContext);
+ File taskTmpDir = new File(ttd.toUri().getPath());
File expectedFile = new File(taskTmpDir, partFile);
assertTrue(expectedFile + " does not exists", expectedFile.exists());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Feb 10 01:49:08 2012
@@ -19,6 +19,8 @@
package org.apache.hadoop.mapreduce.security;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -33,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -158,4 +161,13 @@ public class TestTokenCache {
return mockFs;
}
+
+ @Test
+ public void testCleanUpTokenReferral() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, "foo");
+ TokenCache.cleanUpTokenReferral(conf);
+ assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY));
+ }
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Fri Feb 10 01:49:08 2012
@@ -249,8 +249,9 @@ public class CompletedJob implements org
}
if (historyFileAbsolute != null) {
+ JobHistoryParser parser = null;
try {
- JobHistoryParser parser =
+ parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute);
jobInfo = parser.parse();
@@ -258,6 +259,12 @@ public class CompletedJob implements org
throw new YarnException("Could not load history file "
+ historyFileAbsolute, e);
}
+ IOException parseException = parser.getParseException();
+ if (parseException != null) {
+ throw new YarnException(
+ "Could not parse history file " + historyFileAbsolute,
+ parseException);
+ }
} else {
throw new IOException("History file not found");
}
@@ -321,9 +328,6 @@ public class CompletedJob implements org
@Override
public
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return true;
- }
Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
AccessControlList jobACL = jobACLs.get(jobOperation);
return aclsMgr.checkAccess(callerUGI, jobOperation,
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1242635&r1=1242634&r2=1242635&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Fri Feb 10 01:49:08 2012
@@ -152,7 +152,7 @@ public class PartialJob implements org.a
@Override
public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
- return false;
+ return true;
}
@Override