You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2012/12/17 00:27:29 UTC
svn commit: r1422716 - in
/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/...
Author: suresh
Date: Sun Dec 16 23:27:26 2012
New Revision: 1422716
URL: http://svn.apache.org/viewvc?rev=1422716&view=rev
Log:
Merge trunk into branch-trunk-win
Added:
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
- copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
- copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
- copied unchanged from r1422713, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed)
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1422281-1422713
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Sun Dec 16 23:27:26 2012
@@ -14,6 +14,8 @@ Trunk (Unreleased)
MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
(Avner BenHanoch via acmurthy)
+ MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
+
IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
@@ -71,6 +73,9 @@ Trunk (Unreleased)
MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive.
(Brandon Li via suresh)
+ MAPREDUCE-4809. Change visibility of classes for pluggable sort changes.
+ (masokan via tucu)
+
BUG FIXES
MAPREDUCE-4356. [Rumen] Provide access to the method
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1422281-1422713
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1422281-1422713
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Sun Dec 16 23:27:26 2012
@@ -34,6 +34,8 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +56,7 @@ import org.apache.hadoop.io.serializer.S
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
@@ -71,7 +74,9 @@ import org.apache.hadoop.util.StringInte
import org.apache.hadoop.util.StringUtils;
/** A Map task. */
-class MapTask extends Task {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapTask extends Task {
/**
* The size of each record in the index file for the map-outputs.
*/
@@ -338,6 +343,10 @@ class MapTask extends Task {
done(umbilical, reporter);
}
+ public Progress getSortPhase() {
+ return sortPhase;
+ }
+
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset)
throws IOException {
@@ -367,6 +376,22 @@ class MapTask extends Task {
}
@SuppressWarnings("unchecked")
+ private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
+ createSortingCollector(JobConf job, TaskReporter reporter)
+ throws IOException, ClassNotFoundException {
+ MapOutputCollector<KEY, VALUE> collector
+ = (MapOutputCollector<KEY, VALUE>)
+ ReflectionUtils.newInstance(
+ job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+ MapOutputBuffer.class, MapOutputCollector.class), job);
+ LOG.info("Map output collector class = " + collector.getClass().getName());
+ MapOutputCollector.Context context =
+ new MapOutputCollector.Context(this, job, reporter);
+ collector.init(context);
+ return collector;
+ }
+
+ @SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runOldMapper(final JobConf job,
final TaskSplitIndex splitIndex,
@@ -388,11 +413,14 @@ class MapTask extends Task {
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
- MapOutputCollector collector = null;
+ MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
if (numReduceTasks > 0) {
- collector = new MapOutputBuffer(umbilical, job, reporter);
+ collector = createSortingCollector(job, reporter);
} else {
- collector = new DirectMapOutputCollector(umbilical, job, reporter);
+ collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
+ MapOutputCollector.Context context =
+ new MapOutputCollector.Context(this, job, reporter);
+ collector.init(context);
}
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
@@ -638,7 +666,7 @@ class MapTask extends Task {
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
- collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+ collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
@@ -734,17 +762,6 @@ class MapTask extends Task {
output.close(mapperContext);
}
- interface MapOutputCollector<K, V> {
-
- public void collect(K key, V value, int partition
- ) throws IOException, InterruptedException;
- public void close() throws IOException, InterruptedException;
-
- public void flush() throws IOException, InterruptedException,
- ClassNotFoundException;
-
- }
-
class DirectMapOutputCollector<K, V>
implements MapOutputCollector<K, V> {
@@ -752,14 +769,18 @@ class MapTask extends Task {
private TaskReporter reporter = null;
- private final Counters.Counter mapOutputRecordCounter;
- private final Counters.Counter fileOutputByteCounter;
- private final List<Statistics> fsStats;
+ private Counters.Counter mapOutputRecordCounter;
+ private Counters.Counter fileOutputByteCounter;
+ private List<Statistics> fsStats;
+
+ public DirectMapOutputCollector() {
+ }
@SuppressWarnings("unchecked")
- public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
- JobConf job, TaskReporter reporter) throws IOException {
- this.reporter = reporter;
+ public void init(MapOutputCollector.Context context
+ ) throws IOException, ClassNotFoundException {
+ this.reporter = context.getReporter();
+ JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
@@ -814,25 +835,27 @@ class MapTask extends Task {
}
}
- private class MapOutputBuffer<K extends Object, V extends Object>
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
+ @InterfaceStability.Unstable
+ public static class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
- final int partitions;
- final JobConf job;
- final TaskReporter reporter;
- final Class<K> keyClass;
- final Class<V> valClass;
- final RawComparator<K> comparator;
- final SerializationFactory serializationFactory;
- final Serializer<K> keySerializer;
- final Serializer<V> valSerializer;
- final CombinerRunner<K,V> combinerRunner;
- final CombineOutputCollector<K, V> combineCollector;
+ private int partitions;
+ private JobConf job;
+ private TaskReporter reporter;
+ private Class<K> keyClass;
+ private Class<V> valClass;
+ private RawComparator<K> comparator;
+ private SerializationFactory serializationFactory;
+ private Serializer<K> keySerializer;
+ private Serializer<V> valSerializer;
+ private CombinerRunner<K,V> combinerRunner;
+ private CombineOutputCollector<K, V> combineCollector;
// Compression for map-outputs
- final CompressionCodec codec;
+ private CompressionCodec codec;
// k/v accounting
- final IntBuffer kvmeta; // metadata overlay on backing store
+ private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata
int kvend; // marks end of spill metadata
int kvindex; // marks end of fully serialized records
@@ -856,15 +879,15 @@ class MapTask extends Task {
private static final int METASIZE = NMETA * 4; // size in bytes
// spill accounting
- final int maxRec;
- final int softLimit;
+ private int maxRec;
+ private int softLimit;
boolean spillInProgress;;
int bufferRemaining;
volatile Throwable sortSpillException = null;
int numSpills = 0;
- final int minSpillsForCombine;
- final IndexedSorter sorter;
+ private int minSpillsForCombine;
+ private IndexedSorter sorter;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
@@ -872,12 +895,12 @@ class MapTask extends Task {
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
- final FileSystem rfs;
+ private FileSystem rfs;
// Counters
- final Counters.Counter mapOutputByteCounter;
- final Counters.Counter mapOutputRecordCounter;
- final Counters.Counter fileOutputByteCounter;
+ private Counters.Counter mapOutputByteCounter;
+ private Counters.Counter mapOutputRecordCounter;
+ private Counters.Counter fileOutputByteCounter;
final ArrayList<SpillRecord> indexCacheList =
new ArrayList<SpillRecord>();
@@ -885,12 +908,23 @@ class MapTask extends Task {
private int indexCacheMemoryLimit;
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
+ private MapTask mapTask;
+ private MapOutputFile mapOutputFile;
+ private Progress sortPhase;
+ private Counters.Counter spilledRecordsCounter;
+
+ public MapOutputBuffer() {
+ }
+
@SuppressWarnings("unchecked")
- public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
- TaskReporter reporter
- ) throws IOException, ClassNotFoundException {
- this.job = job;
- this.reporter = reporter;
+ public void init(MapOutputCollector.Context context
+ ) throws IOException, ClassNotFoundException {
+ job = context.getJobConf();
+ reporter = context.getReporter();
+ mapTask = context.getMapTask();
+ mapOutputFile = mapTask.getMapOutputFile();
+ sortPhase = mapTask.getSortPhase();
+ spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
@@ -967,7 +1001,7 @@ class MapTask extends Task {
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
- combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
+ combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
@@ -1118,6 +1152,10 @@ class MapTask extends Task {
}
}
+ private TaskAttemptID getTaskID() {
+ return mapTask.getTaskID();
+ }
+
/**
* Set the point from which meta and serialization data expand. The meta
* indices are aligned with the buffer, so metadata never spans the ends of
@@ -1490,7 +1528,7 @@ class MapTask extends Task {
if (lspillException instanceof Error) {
final String logMsg = "Task " + getTaskID() + " failed : " +
StringUtils.stringifyException(lspillException);
- reportFatalError(getTaskID(), lspillException, logMsg);
+ mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
}
throw new IOException("Spill failed", lspillException);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Sun Dec 16 23:27:26 2012
@@ -26,6 +26,8 @@ import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,7 +36,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.PureJavaCrc32;
-class SpillRecord {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class SpillRecord {
/** Backing store */
private final ByteBuffer buf;
@@ -143,17 +147,3 @@ class SpillRecord {
}
}
-
-class IndexRecord {
- long startOffset;
- long rawLength;
- long partLength;
-
- public IndexRecord() { }
-
- public IndexRecord(long startOffset, long rawLength, long partLength) {
- this.startOffset = startOffset;
- this.rawLength = rawLength;
- this.partLength = partLength;
- }
-}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sun Dec 16 23:27:26 2012
@@ -584,9 +584,9 @@ abstract public class Task implements Wr
return status;
}
- @InterfaceAudience.Private
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
- protected class TaskReporter
+ public class TaskReporter
extends org.apache.hadoop.mapreduce.StatusReporter
implements Runnable, Reporter {
private TaskUmbilicalProtocol umbilical;
@@ -1466,9 +1466,9 @@ abstract public class Task implements Wr
return reducerContext;
}
- @InterfaceAudience.Private
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
- protected static abstract class CombinerRunner<K,V> {
+ public static abstract class CombinerRunner<K,V> {
protected final Counters.Counter inputCounter;
protected final JobConf job;
protected final TaskReporter reporter;
@@ -1486,13 +1486,13 @@ abstract public class Task implements Wr
* @param iterator the key/value pairs to use as input
* @param collector the output collector
*/
- abstract void combine(RawKeyValueIterator iterator,
+ public abstract void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector
) throws IOException, InterruptedException,
ClassNotFoundException;
@SuppressWarnings("unchecked")
- static <K,V>
+ public static <K,V>
CombinerRunner<K,V> create(JobConf job,
TaskAttemptID taskId,
Counters.Counter inputCounter,
@@ -1542,7 +1542,7 @@ abstract public class Task implements Wr
}
@SuppressWarnings("unchecked")
- protected void combine(RawKeyValueIterator kvIter,
+ public void combine(RawKeyValueIterator kvIter,
OutputCollector<K,V> combineCollector
) throws IOException {
Reducer<K,V,K,V> combiner =
@@ -1611,7 +1611,7 @@ abstract public class Task implements Wr
@SuppressWarnings("unchecked")
@Override
- void combine(RawKeyValueIterator iterator,
+ public void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector
) throws IOException, InterruptedException,
ClassNotFoundException {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sun Dec 16 23:27:26 2012
@@ -30,6 +30,9 @@ public interface MRJobConfig {
public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+ public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR
+ = "mapreduce.job.map.output.collector.class";
+
public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java Sun Dec 16 23:27:26 2012
@@ -17,9 +17,14 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
* An interface for reporting exceptions to other threads
*/
-interface ExceptionReporter {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public interface ExceptionReporter {
void reportException(Throwable t);
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java Sun Dec 16 23:27:26 2012
@@ -20,9 +20,14 @@ package org.apache.hadoop.mapreduce.task
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
import org.apache.hadoop.mapreduce.TaskAttemptID;
-class MapHost {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapHost {
public static enum State {
IDLE, // No map outputs available
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Sun Dec 16 23:27:26 2012
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -33,7 +35,9 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-class MapOutput<K,V> {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapOutput<K,V> {
private static final Log LOG = LogFactory.getLog(MapOutput.class);
private static AtomicInteger ID = new AtomicInteger(0);
@@ -62,7 +66,7 @@ class MapOutput<K,V> {
private final boolean primaryMapOutput;
- MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size,
+ public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size,
JobConf conf, LocalDirAllocator localDirAllocator,
int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
throws IOException {
@@ -87,7 +91,7 @@ class MapOutput<K,V> {
this.primaryMapOutput = primaryMapOutput;
}
- MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size,
+ public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size,
boolean primaryMapOutput) {
this.id = ID.incrementAndGet();
this.mapId = mapId;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Sun Dec 16 23:27:26 2012
@@ -59,7 +59,7 @@ import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings(value={"unchecked", "deprecation"})
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class MergeManager<K, V> {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Sun Dec 16 23:27:26 2012
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
-@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java Sun Dec 16 23:27:26 2012
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -25,7 +28,9 @@ import org.apache.hadoop.metrics.Metrics
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
-class ShuffleClientMetrics implements Updater {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class ShuffleClientMetrics implements Updater {
private MetricsRecord shuffleMetrics = null;
private int numFailedFetches = 0;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1422716&r1=1422715&r2=1422716&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sun Dec 16 23:27:26 2012
@@ -938,4 +938,12 @@
<value>jhs/_HOST@REALM.TLD</value>
</property>
+<property>
+ <name>mapreduce.job.map.output.collector.class</name>
+ <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
+ <description>
+ It defines the MapOutputCollector implementation to use.
+ </description>
+</property>
+
</configuration>
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1422281-1422713