You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/11/29 00:27:34 UTC
svn commit: r1414995 - in
/hadoop/common/branches/MR-2454/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/a...
Author: tucu
Date: Wed Nov 28 23:27:32 2012
New Revision: 1414995
URL: http://svn.apache.org/viewvc?rev=1414995&view=rev
Log:
Reverting MAPREDUCE-4809 as the committed patch is incorrect
Modified:
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/CHANGES.txt Wed Nov 28 23:27:32 2012
@@ -1,10 +1,5 @@
Hadoop MapReduce Change Log
-Branch MR-2454
-
- MAPREDUCE-4809. Make internal classes required for MAPREDUCE-2454 to be
- java public. (Mariappan Asokan via acmurthy)
-
Trunk (Unreleased)
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Nov 28 23:27:32 2012
@@ -34,8 +34,6 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -56,6 +54,7 @@ import org.apache.hadoop.io.serializer.S
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
@@ -73,9 +72,7 @@ import org.apache.hadoop.util.StringInte
import org.apache.hadoop.util.StringUtils;
/** A Map task. */
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
-@InterfaceStability.Unstable
-public class MapTask extends Task {
+class MapTask extends Task {
/**
* The size of each record in the index file for the map-outputs.
*/
@@ -342,6 +339,10 @@ public class MapTask extends Task {
done(umbilical, reporter);
}
+ public Progress getSortPhase() {
+ return sortPhase;
+ }
+
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset)
throws IOException {
@@ -371,6 +372,22 @@ public class MapTask extends Task {
}
@SuppressWarnings("unchecked")
+ private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
+ createSortingCollector(JobConf job, TaskReporter reporter)
+ throws IOException, ClassNotFoundException {
+ MapOutputCollector<KEY, VALUE> collector
+ = (MapOutputCollector<KEY, VALUE>)
+ ReflectionUtils.newInstance(
+ job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+ MapOutputBuffer.class, MapOutputCollector.class), job);
+ LOG.info("Map output collector class = " + collector.getClass().getName());
+ MapOutputCollector.Context context =
+ new MapOutputCollector.Context(this, job, reporter);
+ collector.init(context);
+ return collector;
+ }
+
+ @SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runOldMapper(final JobConf job,
final TaskSplitIndex splitIndex,
@@ -392,11 +409,14 @@ public class MapTask extends Task {
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
- MapOutputCollector collector = null;
+ MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
if (numReduceTasks > 0) {
- collector = new MapOutputBuffer(umbilical, job, reporter);
+ collector = createSortingCollector(job, reporter);
} else {
- collector = new DirectMapOutputCollector(umbilical, job, reporter);
+ collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
+ MapOutputCollector.Context context =
+ new MapOutputCollector.Context(this, job, reporter);
+ collector.init(context);
}
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
@@ -642,7 +662,7 @@ public class MapTask extends Task {
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
- collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+ collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
@@ -738,17 +758,6 @@ public class MapTask extends Task {
output.close(mapperContext);
}
- interface MapOutputCollector<K, V> {
-
- public void collect(K key, V value, int partition
- ) throws IOException, InterruptedException;
- public void close() throws IOException, InterruptedException;
-
- public void flush() throws IOException, InterruptedException,
- ClassNotFoundException;
-
- }
-
class DirectMapOutputCollector<K, V>
implements MapOutputCollector<K, V> {
@@ -756,14 +765,18 @@ public class MapTask extends Task {
private TaskReporter reporter = null;
- private final Counters.Counter mapOutputRecordCounter;
- private final Counters.Counter fileOutputByteCounter;
- private final List<Statistics> fsStats;
+ private Counters.Counter mapOutputRecordCounter;
+ private Counters.Counter fileOutputByteCounter;
+ private List<Statistics> fsStats;
+
+ public DirectMapOutputCollector() {
+ }
@SuppressWarnings("unchecked")
- public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
- JobConf job, TaskReporter reporter) throws IOException {
- this.reporter = reporter;
+ public void init(MapOutputCollector.Context context
+ ) throws IOException, ClassNotFoundException {
+ this.reporter = context.getReporter();
+ JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
@@ -818,27 +831,25 @@ public class MapTask extends Task {
}
}
- @InterfaceAudience.LimitedPrivate({"MapReduce"})
- @InterfaceStability.Unstable
- public static class MapOutputBuffer<K extends Object, V extends Object>
+ private class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
- final int partitions;
- final JobConf job;
- final TaskReporter reporter;
- final Class<K> keyClass;
- final Class<V> valClass;
- final RawComparator<K> comparator;
- final SerializationFactory serializationFactory;
- final Serializer<K> keySerializer;
- final Serializer<V> valSerializer;
- final CombinerRunner<K,V> combinerRunner;
- final CombineOutputCollector<K, V> combineCollector;
+ private int partitions;
+ private JobConf job;
+ private TaskReporter reporter;
+ private Class<K> keyClass;
+ private Class<V> valClass;
+ private RawComparator<K> comparator;
+ private SerializationFactory serializationFactory;
+ private Serializer<K> keySerializer;
+ private Serializer<V> valSerializer;
+ private CombinerRunner<K,V> combinerRunner;
+ private CombineOutputCollector<K, V> combineCollector;
// Compression for map-outputs
- final CompressionCodec codec;
+ private CompressionCodec codec;
// k/v accounting
- final IntBuffer kvmeta; // metadata overlay on backing store
+ private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata
int kvend; // marks end of spill metadata
int kvindex; // marks end of fully serialized records
@@ -862,15 +873,15 @@ public class MapTask extends Task {
private static final int METASIZE = NMETA * 4; // size in bytes
// spill accounting
- final int maxRec;
- final int softLimit;
+ private int maxRec;
+ private int softLimit;
boolean spillInProgress;;
int bufferRemaining;
volatile Throwable sortSpillException = null;
int numSpills = 0;
- final int minSpillsForCombine;
- final IndexedSorter sorter;
+ private int minSpillsForCombine;
+ private IndexedSorter sorter;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
@@ -878,12 +889,12 @@ public class MapTask extends Task {
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
- final FileSystem rfs;
+ private FileSystem rfs;
// Counters
- final Counters.Counter mapOutputByteCounter;
- final Counters.Counter mapOutputRecordCounter;
- final Counters.Counter fileOutputByteCounter;
+ private Counters.Counter mapOutputByteCounter;
+ private Counters.Counter mapOutputRecordCounter;
+ private Counters.Counter fileOutputByteCounter;
final ArrayList<SpillRecord> indexCacheList =
new ArrayList<SpillRecord>();
@@ -891,12 +902,23 @@ public class MapTask extends Task {
private int indexCacheMemoryLimit;
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
+ private MapTask mapTask;
+ private MapOutputFile mapOutputFile;
+ private Progress sortPhase;
+ private Counters.Counter spilledRecordsCounter;
+
+ public MapOutputBuffer() {
+ }
+
@SuppressWarnings("unchecked")
- public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
- TaskReporter reporter
- ) throws IOException, ClassNotFoundException {
- this.job = job;
- this.reporter = reporter;
+ public void init(MapOutputCollector.Context context
+ ) throws IOException, ClassNotFoundException {
+ job = context.getJobConf();
+ reporter = context.getReporter();
+ mapTask = context.getMapTask();
+ mapOutputFile = mapTask.getMapOutputFile();
+ sortPhase = mapTask.getSortPhase();
+ spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
@@ -973,7 +995,7 @@ public class MapTask extends Task {
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
- combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
+ combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
@@ -1124,6 +1146,10 @@ public class MapTask extends Task {
}
}
+ private TaskAttemptID getTaskID() {
+ return mapTask.getTaskID();
+ }
+
/**
* Set the point from which meta and serialization data expand. The meta
* indices are aligned with the buffer, so metadata never spans the ends of
@@ -1496,7 +1522,7 @@ public class MapTask extends Task {
if (lspillException instanceof Error) {
final String logMsg = "Task " + getTaskID() + " failed : " +
StringUtils.stringifyException(lspillException);
- reportFatalError(getTaskID(), lspillException, logMsg);
+ mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
}
throw new IOException("Spill failed", lspillException);
}
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Wed Nov 28 23:27:32 2012
@@ -26,8 +26,6 @@ import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -36,9 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.PureJavaCrc32;
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
-@InterfaceStability.Unstable
-class SpillRecord {
+public class SpillRecord {
/** Backing store */
private final ByteBuffer buf;
@@ -147,17 +143,3 @@ class SpillRecord {
}
}
-
-class IndexRecord {
- long startOffset;
- long rawLength;
- long partLength;
-
- public IndexRecord() { }
-
- public IndexRecord(long startOffset, long rawLength, long partLength) {
- this.startOffset = startOffset;
- this.rawLength = rawLength;
- this.partLength = partLength;
- }
-}
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Wed Nov 28 23:27:32 2012
@@ -584,9 +584,9 @@ abstract public class Task implements Wr
return status;
}
- @InterfaceAudience.LimitedPrivate({"MapReduce"})
+ @InterfaceAudience.Private
@InterfaceStability.Unstable
- public class TaskReporter
+ protected class TaskReporter
extends org.apache.hadoop.mapreduce.StatusReporter
implements Runnable, Reporter {
private TaskUmbilicalProtocol umbilical;
@@ -1466,9 +1466,9 @@ abstract public class Task implements Wr
return reducerContext;
}
- @InterfaceAudience.LimitedPrivate({"MapReduce"})
+ @InterfaceAudience.Private
@InterfaceStability.Unstable
- public static abstract class CombinerRunner<K,V> {
+ protected static abstract class CombinerRunner<K,V> {
protected final Counters.Counter inputCounter;
protected final JobConf job;
protected final TaskReporter reporter;
@@ -1486,13 +1486,13 @@ abstract public class Task implements Wr
* @param iterator the key/value pairs to use as input
* @param collector the output collector
*/
- public abstract void combine(RawKeyValueIterator iterator,
+ abstract void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector
) throws IOException, InterruptedException,
ClassNotFoundException;
@SuppressWarnings("unchecked")
- public static <K,V>
+ static <K,V>
CombinerRunner<K,V> create(JobConf job,
TaskAttemptID taskId,
Counters.Counter inputCounter,
@@ -1542,7 +1542,7 @@ abstract public class Task implements Wr
}
@SuppressWarnings("unchecked")
- public void combine(RawKeyValueIterator kvIter,
+ protected void combine(RawKeyValueIterator kvIter,
OutputCollector<K,V> combineCollector
) throws IOException {
Reducer<K,V,K,V> combiner =
@@ -1611,7 +1611,7 @@ abstract public class Task implements Wr
@SuppressWarnings("unchecked")
@Override
- public void combine(RawKeyValueIterator iterator,
+ void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector
) throws IOException, InterruptedException,
ClassNotFoundException {
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Wed Nov 28 23:27:32 2012
@@ -30,6 +30,9 @@ public interface MRJobConfig {
public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+ public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR
+ = "mapreduce.job.map.output.collector.class";
+
public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java Wed Nov 28 23:27:32 2012
@@ -17,15 +17,9 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
/**
* An interface for reporting exceptions to other threads
*/
-
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
-@InterfaceStability.Unstable
-public interface ExceptionReporter {
+interface ExceptionReporter {
void reportException(Throwable t);
}
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java Wed Nov 28 23:27:32 2012
@@ -20,13 +20,9 @@ package org.apache.hadoop.mapreduce.task
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
-@InterfaceStability.Unstable
-public class MapHost {
+class MapHost {
public static enum State {
IDLE, // No map outputs available
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Wed Nov 28 23:27:32 2012
@@ -24,8 +24,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -35,9 +33,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
-@InterfaceStability.Unstable
-public abstract class MapOutput<K,V> {
+class MapOutput<K,V> {
private static final Log LOG = LogFactory.getLog(MapOutput.class);
private static AtomicInteger ID = new AtomicInteger(0);
@@ -66,7 +62,7 @@ public abstract class MapOutput<K,V> {
private final boolean primaryMapOutput;
- MapOutput(caskAttemptID mapId, MergeManager<K,V> merger, long size,
+ MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size,
JobConf conf, LocalDirAllocator localDirAllocator,
int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
throws IOException {
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Wed Nov 28 23:27:32 2012
@@ -59,7 +59,7 @@ import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings(value={"unchecked", "deprecation"})
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MergeManager<K, V> {
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Wed Nov 28 23:27:32 2012
@@ -38,7 +38,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter {
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java Wed Nov 28 23:27:32 2012
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -27,9 +25,7 @@ import org.apache.hadoop.metrics.Metrics
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
-@InterfaceStability.Unstable
-public class ShuffleClientMetrics implements Updater {
+class ShuffleClientMetrics implements Updater {
private MetricsRecord shuffleMetrics = null;
private int numFailedFetches = 0;
Modified: hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1414995&r1=1414994&r2=1414995&view=diff
==============================================================================
--- hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/MR-2454/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Wed Nov 28 23:27:32 2012
@@ -928,4 +928,12 @@
<value>jhs/_HOST@REALM.TLD</value>
</property>
+<property>
+ <name>mapreduce.job.map.output.collector.class</name>
+ <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
+ <description>
+ It defines the MapOutputCollector implementation to use.
+ </description>
+</property>
+
</configuration>