You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/07/25 01:11:17 UTC
svn commit: r1506774 - in /hadoop/common/branches/branch-1: ./ src/mapred/
src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapred/
Author: tucu
Date: Wed Jul 24 23:11:17 2013
New Revision: 1506774
URL: http://svn.apache.org/r1506774
Log:
MAPREDUCE-2454. plugin for generic shuffle service. (avnerb via tucu)
Added:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Wed Jul 24 23:11:17 2013
@@ -6,6 +6,8 @@ Release 1.3.0 - unreleased
NEW FEATURES
+ MAPREDUCE-2454. plugin for generic shuffle service. (avnerb via tucu)
+
IMPROVEMENTS
HADOOP-9450. HADOOP_USER_CLASSPATH_FIRST is not honored; CLASSPATH
Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Wed Jul 24 23:11:17 2013
@@ -221,6 +221,24 @@
</property>
<property>
+ <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
+ <value>org.apache.hadoop.mapred.ReduceTask$ReduceCopier</value>
+ <description>Name of the class whose instance will be used
+ to send shuffle requests by reducetasks of this job.
+ The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.shuffle.provider.plugin.classes</name>
+ <value>org.apache.hadoop.mapred.TaskTracker$DefaultShuffleProvider</value>
+ <description>A comma-separated list of classes that should be loaded as ShuffleProviderPlugin(s).
+ A ShuffleProviderPlugin can serve shuffle requests from reducetasks.
+ Each class in the list must be an instance of org.apache.hadoop.mapred.ShuffleProviderPlugin.
+ </description>
+</property>
+
+<property>
<name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name>
<value>5000</value>
<description>The interval, in milliseconds, for which the tasktracker waits
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed Jul 24 23:11:17 2013
@@ -77,6 +77,7 @@ import org.apache.hadoop.mapred.Merger.S
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@@ -94,7 +95,7 @@ import org.apache.hadoop.metrics2.lib.Me
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
/** A Reduce task. */
-class ReduceTask extends Task {
+public class ReduceTask extends Task {
static { // register a ctor
WritableFactories.setFactory
@@ -106,7 +107,6 @@ class ReduceTask extends Task {
private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
private int numMaps;
- private ReduceCopier reduceCopier;
private CompressionCodec codec;
@@ -379,16 +379,28 @@ class ReduceTask extends Task {
// Initialize the codec
codec = initCodec();
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
if (!isLocal) {
- reduceCopier = new ReduceCopier(umbilical, job, reporter);
- if (!reduceCopier.fetchOutputs()) {
- if(reduceCopier.mergeThrowable instanceof FSError) {
- throw (FSError)reduceCopier.mergeThrowable;
+ // loads ShuffleConsumerPlugin according to configuration file
+ // +++ NOTE: This code support load of 3rd party plugins at runtime +++
+ //
+ Class<? extends ShuffleConsumerPlugin> clazz =
+ job.getClass(JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, ReduceCopier.class, ShuffleConsumerPlugin.class);
+
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+ LOG.info(" Using ShuffleConsumerPlugin : " + shuffleConsumerPlugin);
+
+ ShuffleConsumerPlugin.Context context = new ShuffleConsumerPlugin.Context(ReduceTask.this, umbilical, conf, reporter);
+ shuffleConsumerPlugin.init(context);
+
+ if (!shuffleConsumerPlugin.fetchOutputs()) {
+ if(shuffleConsumerPlugin.getMergeThrowable() instanceof FSError) {
+ throw (FSError)shuffleConsumerPlugin.getMergeThrowable();
}
throw new IOException("Task: " + getTaskID() +
- " - The reduce copier failed", reduceCopier.mergeThrowable);
+ " - The ShuffleConsumerPlugin " + clazz.getSimpleName() + " failed", shuffleConsumerPlugin.getMergeThrowable());
}
}
copyPhase.complete(); // copy is already complete
@@ -402,7 +414,7 @@ class ReduceTask extends Task {
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null)
- : reduceCopier.createKVIterator(job, rfs, reporter);
+ : shuffleConsumerPlugin.createKVIterator(job, rfs, reporter);
// free up the data structures
mapOutputFilesOnDisk.clear();
@@ -421,6 +433,9 @@ class ReduceTask extends Task {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
+ if (shuffleConsumerPlugin != null) {
+ shuffleConsumerPlugin.close();
+ }
done(umbilical, reporter);
}
@@ -658,11 +673,11 @@ class ReduceTask extends Task {
OTHER_ERROR
};
- class ReduceCopier<K, V> implements MRConstants {
+ public static class ReduceCopier<K, V> implements ShuffleConsumerPlugin, MRConstants {
/** Reference to the umbilical object */
private TaskUmbilicalProtocol umbilical;
- private final TaskReporter reporter;
+ private TaskReporter reporter;
/** Reference to the task object */
@@ -749,18 +764,18 @@ class ReduceTask extends Task {
/**
* When we accumulate maxInMemOutputs number of files in ram, we merge/spill
*/
- private final int maxInMemOutputs;
+ private int maxInMemOutputs;
/**
* Usage threshold for in-memory output accumulation.
*/
- private final float maxInMemCopyPer;
+ private float maxInMemCopyPer;
/**
* Maximum memory usage of map outputs to merge from memory into
* the reduce, in bytes.
*/
- private final long maxInMemReduce;
+ private long maxInMemReduce;
/**
* The threads for fetching the files.
@@ -810,7 +825,7 @@ class ReduceTask extends Task {
/**
* Maximum number of fetch failures before reducer aborts.
*/
- private final int abortFailureLimit;
+ private int abortFailureLimit;
/**
* Initial penalty time in ms for a fetch failure.
@@ -918,8 +933,8 @@ class ReduceTask extends Task {
ShuffleClientInstrumentation(JobConf conf) {
registry.tag("user", "User name", conf.getUser())
.tag("jobName", "Job name", conf.getJobName())
- .tag("jobId", "Job ID", ReduceTask.this.getJobID().toString())
- .tag("taskId", "Task ID", getTaskID().toString())
+ .tag("jobId", "Job ID", reduceTask.getJobID().toString())
+ .tag("taskId", "Task ID", reduceTask.getTaskID().toString())
.tag("sessionId", "Session ID", conf.getSessionId());
}
@@ -960,7 +975,7 @@ class ReduceTask extends Task {
private ShuffleClientInstrumentation createShuffleClientInstrumentation() {
return DefaultMetricsSystem.INSTANCE.register("ShuffleClientMetrics",
- "Shuffle input metrics", new ShuffleClientInstrumentation(conf));
+ "Shuffle input metrics", new ShuffleClientInstrumentation(reduceTask.conf));
}
/** Represents the result of an attempt to copy a map output */
@@ -1353,15 +1368,15 @@ class ReduceTask extends Task {
LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " +
StringUtils.stringifyException(e));
try {
- umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext);
+ umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), reduceTask.jvmContext);
} catch (IOException io) {
LOG.error("Could not notify TT of FSError: " +
StringUtils.stringifyException(io));
}
} catch (Throwable th) {
- String msg = getTaskID() + " : Map output copy failure : "
+ String msg = reduceTask.getTaskID() + " : Map output copy failure : "
+ StringUtils.stringifyException(th);
- reportFatalError(getTaskID(), th, msg);
+ reduceTask.reportFatalError(reduceTask.getTaskID(), th, msg);
}
}
@@ -1410,7 +1425,7 @@ class ReduceTask extends Task {
long bytes = mapOutput.compressedSize;
// lock the ReduceTask while we do the rename
- synchronized (ReduceTask.this) {
+ synchronized (reduceTask) {
if (copiedMapOutputs.contains(loc.getTaskId())) {
mapOutput.discard();
return CopyResult.OBSOLETE;
@@ -1446,7 +1461,7 @@ class ReduceTask extends Task {
tmpMapOutput + " to " + filename);
}
- synchronized (mapOutputFilesOnDisk) {
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
FileStatus fileStatus = localFileSys.getFileStatus(filename);
CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
fileStatus, mapOutput.decompressedSize);
@@ -1469,7 +1484,7 @@ class ReduceTask extends Task {
*/
private void noteCopiedMapOutput(TaskID taskId) {
copiedMapOutputs.add(taskId);
- ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
+ ramManager.setNumCopiedMapOutputs(reduceTask.numMaps - copiedMapOutputs.size());
}
/**
@@ -1689,7 +1704,7 @@ class ReduceTask extends Task {
}
IFileInputStream checksumIn =
- new IFileInputStream(input,compressedLength, conf);
+ new IFileInputStream(input,compressedLength, reduceTask.conf);
input = checksumIn;
@@ -1798,12 +1813,12 @@ class ReduceTask extends Task {
throws IOException {
// Find out a suitable location for the output on local-filesystem
Path localFilename =
- lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
- mapOutputLength, conf);
+ reduceTask.lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(),
+ mapOutputLength, reduceTask.conf);
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(),
- conf, localFileSys.makeQualified(localFilename),
+ reduceTask.conf, localFileSys.makeQualified(localFilename),
mapOutputLength);
@@ -1870,9 +1885,9 @@ class ReduceTask extends Task {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ioe);
} catch (Throwable t) {
- String msg = getTaskID() + " : Failed in shuffle to disk :"
+ String msg = reduceTask.getTaskID() + " : Failed in shuffle to disk :"
+ StringUtils.stringifyException(t);
- reportFatalError(getTaskID(), t, msg);
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
}
mapOutput = null;
@@ -1894,7 +1909,7 @@ class ReduceTask extends Task {
throws IOException {
// get the task and the current classloader which will become the parent
- Task task = ReduceTask.this;
+ Task task = reduceTask;
ClassLoader parent = conf.getClassLoader();
// get the work directory which holds the elements we are dynamically
@@ -1925,16 +1940,16 @@ class ReduceTask extends Task {
URLClassLoader loader = new URLClassLoader(urls, parent);
conf.setClassLoader(loader);
}
-
- public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
- TaskReporter reporter
- )throws ClassNotFoundException, IOException {
+
+ @Override
+ public void init (ShuffleConsumerPlugin.Context context)throws ClassNotFoundException, IOException {
+ JobConf conf = context.getConf();
+ this.reporter = context.getReporter();
+ this.umbilical = context.getUmbilical();
+ this.reduceTask = context.getReduceTask();
configureClasspath(conf);
- this.reporter = reporter;
this.shuffleClientMetrics = createShuffleClientInstrumentation();
- this.umbilical = umbilical;
- this.reduceTask = ReduceTask.this;
this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
this.copyResults = new ArrayList<CopyResult>(100);
@@ -1942,22 +1957,22 @@ class ReduceTask extends Task {
this.maxInFlight = 4 * numCopiers;
Counters.Counter combineInputCounter =
reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
- this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
+ this.combinerRunner = CombinerRunner.create(conf, reduceTask.getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
combineCollector =
- new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf);
+ new CombineOutputCollector(reduceTask.reduceCombineOutputCounter, reporter, conf);
}
this.ioSortFactor = conf.getInt("io.sort.factor", 10);
- this.abortFailureLimit = Math.max(30, numMaps / 10);
+ this.abortFailureLimit = Math.max(30, reduceTask.numMaps / 10);
this.maxFetchFailuresBeforeReporting = conf.getInt(
"mapreduce.reduce.shuffle.maxfetchfailures", REPORT_FAILURE_LIMIT);
- this.maxFailedUniqueFetches = Math.min(numMaps,
+ this.maxFailedUniqueFetches = Math.min(reduceTask.numMaps,
this.maxFailedUniqueFetches);
this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
this.maxInMemCopyPer =
@@ -1994,12 +2009,22 @@ class ReduceTask extends Task {
this.reportReadErrorImmediately =
conf.getBoolean("mapreduce.reduce.shuffle.notify.readerror", true);
}
-
+
+ @Override
+ public Throwable getMergeThrowable() {
+ return mergeThrowable;
+ }
+
+ @Override
+ public void close(){
+ }
+
private boolean busyEnough(int numInFlight) {
return numInFlight > maxInFlight;
}
+ @Override
public boolean fetchOutputs() throws IOException {
int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
@@ -2010,7 +2035,7 @@ class ReduceTask extends Task {
InMemFSMergeThread inMemFSMergeThread = null;
GetMapEventsThread getMapEventsThread = null;
- for (int i = 0; i < numMaps; i++) {
+ for (int i = 0; i < reduceTask.numMaps; i++) {
copyPhase.addPhase(); // add sub-phase per file
}
@@ -2018,7 +2043,7 @@ class ReduceTask extends Task {
// start all the copying threads
for (int i=0; i < numCopiers; i++) {
- MapOutputCopier copier = new MapOutputCopier(conf, reporter,
+ MapOutputCopier copier = new MapOutputCopier(reduceTask.conf, reporter,
reduceTask.getJobTokenSecret());
copiers.add(copier);
copier.start();
@@ -2042,7 +2067,7 @@ class ReduceTask extends Task {
long lastOutputTime = 0;
// loop until we get all required outputs
- while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
+ while (copiedMapOutputs.size() < reduceTask.numMaps && mergeThrowable == null) {
int numEventsAtStartOfScheduling;
synchronized (copyResultsOrNewEventsLock) {
numEventsAtStartOfScheduling = numEventsFetched;
@@ -2056,7 +2081,7 @@ class ReduceTask extends Task {
}
if (logNow) {
LOG.info(reduceTask.getTaskID() + " Need another "
- + (numMaps - copiedMapOutputs.size()) + " map output(s) "
+ + (reduceTask.numMaps - copiedMapOutputs.size()) + " map output(s) "
+ "where " + numInFlight + " is already in progress");
}
@@ -2215,15 +2240,15 @@ class ReduceTask extends Task {
if (cr.getSuccess()) { // a successful copy
numCopied++;
lastProgressTime = System.currentTimeMillis();
- reduceShuffleBytes.increment(cr.getSize());
+ reduceTask.reduceShuffleBytes.increment(cr.getSize());
long secsSinceStart =
(System.currentTimeMillis()-startTime)/1000+1;
- float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
+ float mbs = ((float)reduceTask.reduceShuffleBytes.getCounter())/(1024*1024);
float transferRate = mbs/secsSinceStart;
copyPhase.startNextPhase();
- copyPhase.setStatus("copy (" + numCopied + " of " + numMaps
+ copyPhase.setStatus("copy (" + numCopied + " of " + reduceTask.numMaps
+ " at " +
mbpsFormat.format(transferRate) + " MB/s)");
@@ -2249,15 +2274,15 @@ class ReduceTask extends Task {
noFailedFetches =
(noFailedFetches == null) ? 1 : (noFailedFetches + 1);
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
- LOG.info("Task " + getTaskID() + ": Failed fetch #" +
+ LOG.info("Task " + reduceTask.getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId);
if (noFailedFetches >= abortFailureLimit) {
LOG.fatal(noFailedFetches + " failures downloading "
- + getTaskID() + ".");
- umbilical.shuffleError(getTaskID(),
+ + reduceTask.getTaskID() + ".");
+ umbilical.shuffleError(reduceTask.getTaskID(),
"Exceeded the abort failure limit;"
- + " bailing-out.", jvmContext);
+ + " bailing-out.", reduceTask.jvmContext);
}
checkAndInformJobTracker(noFailedFetches, mapTaskId,
@@ -2279,7 +2304,7 @@ class ReduceTask extends Task {
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
- (((float)numCopied / numMaps)
+ (((float)numCopied / reduceTask.numMaps)
>= MIN_REQUIRED_PROGRESS_PERCENT);
// check if the reducer is stalled for a long time
@@ -2300,15 +2325,15 @@ class ReduceTask extends Task {
// kill if not healthy and has insufficient progress
if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
- fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
+ fetchFailedMaps.size() == (reduceTask.numMaps - copiedMapOutputs.size()))
&& !reducerHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
LOG.fatal("Shuffle failed with too many fetch failures " +
"and insufficient progress!" +
- "Killing task " + getTaskID() + ".");
- umbilical.shuffleError(getTaskID(),
+ "Killing task " + reduceTask.getTaskID() + ".");
+ umbilical.shuffleError(reduceTask.getTaskID(),
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
- + " bailing-out.", jvmContext);
+ + " bailing-out.", reduceTask.jvmContext);
}
}
@@ -2347,9 +2372,9 @@ class ReduceTask extends Task {
}
// copiers are done, exit and notify the waiting merge threads
- synchronized (mapOutputFilesOnDisk) {
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
exitLocalFSMerge = true;
- mapOutputFilesOnDisk.notify();
+ reduceTask.mapOutputFilesOnDisk.notify();
}
ramManager.close();
@@ -2360,7 +2385,7 @@ class ReduceTask extends Task {
// Wait for the on-disk merge to complete
localFSMergerThread.join();
LOG.info("Interleaved on-disk merge complete: " +
- mapOutputFilesOnDisk.size() + " files left.");
+ reduceTask.mapOutputFilesOnDisk.size() + " files left.");
//wait for an ongoing merge (if it is in flight) to complete
inMemFSMergeThread.join();
@@ -2377,7 +2402,7 @@ class ReduceTask extends Task {
return false;
}
}
- return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
+ return mergeThrowable == null && copiedMapOutputs.size() == reduceTask.numMaps;
}
// Notify the JobTracker
@@ -2387,8 +2412,8 @@ class ReduceTask extends Task {
int failures, TaskAttemptID mapId, boolean readError) {
if ((reportReadErrorImmediately && readError)
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
- synchronized (ReduceTask.this) {
- taskStatus.addFetchFailedMap(mapId);
+ synchronized (reduceTask) {
+ reduceTask.taskStatus.addFetchFailedMap(mapId);
reporter.progress();
LOG.info("Failed to fetch map-output from " + mapId +
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
@@ -2442,15 +2467,16 @@ class ReduceTask extends Task {
* first merge pass. If not, then said outputs must be written to disk
* first.
*/
+ @Override
@SuppressWarnings("unchecked")
- private RawKeyValueIterator createKVIterator(
+ public RawKeyValueIterator createKVIterator(
JobConf job, FileSystem fs, Reporter reporter) throws IOException {
// merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
boolean keepInputs = job.getKeepFailedTaskFiles();
- final Path tmpDir = new Path(getTaskID().toString());
+ final Path tmpDir = new Path(reduceTask.getTaskID().toString());
final RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
@@ -2463,15 +2489,15 @@ class ReduceTask extends Task {
maxInMemReduce);
final int numMemDiskSegments = memDiskSegments.size();
if (numMemDiskSegments > 0 &&
- ioSortFactor > mapOutputFilesOnDisk.size()) {
+ ioSortFactor > reduceTask.mapOutputFilesOnDisk.size()) {
// must spill to disk, but can't retain in-mem for intermediate merge
final Path outputPath =
- mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
+ reduceTask.mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
- tmpDir, comparator, reporter, spilledRecordsCounter, null);
+ tmpDir, comparator, reporter, reduceTask.spilledRecordsCounter, null);
Writer writer = new Writer(job, fs, outputPath,
- keyClass, valueClass, codec, null);
+ keyClass, valueClass, reduceTask.codec, null);
try {
Merger.writeFile(rIter, writer, reporter, job);
writer.close();
@@ -2508,15 +2534,15 @@ class ReduceTask extends Task {
long onDiskBytes = inMemToDiskBytes;
long totalDecompressedBytes = inMemToDiskBytes;
- for (CompressAwareFileStatus filestatus : mapOutputFilesOnDisk) {
+ for (CompressAwareFileStatus filestatus : reduceTask.mapOutputFilesOnDisk) {
long len = filestatus.getLen();
onDiskBytes += len;
diskSegments.add(new Segment<K, V>(job, fs, filestatus.getPath(),
- codec, keepInputs, filestatus.getDecompressedSize()));
+ reduceTask.codec, keepInputs, filestatus.getDecompressedSize()));
totalDecompressedBytes += (filestatus.getDecompressedSize() > 0) ? filestatus
.getDecompressedSize() : len;
}
- LOG.info("Merging " + mapOutputFilesOnDisk.size() + " files, " +
+ LOG.info("Merging " + reduceTask.mapOutputFilesOnDisk.size() + " files, " +
onDiskBytes + " bytes from disk");
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
@@ -2537,9 +2563,9 @@ class ReduceTask extends Task {
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
RawKeyValueIterator diskMerge = Merger.merge(
- job, fs, keyClass, valueClass, codec, diskSegments,
+ job, fs, keyClass, valueClass, reduceTask.codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
- reporter, false, spilledRecordsCounter, null);
+ reporter, false, reduceTask.spilledRecordsCounter, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
@@ -2549,7 +2575,7 @@ class ReduceTask extends Task {
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
- comparator, reporter, spilledRecordsCounter, null);
+ comparator, reporter, reduceTask.spilledRecordsCounter, null);
}
class RawKVIteratorReader extends IFile.Reader<K,V> {
@@ -2558,7 +2584,7 @@ class ReduceTask extends Task {
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
throws IOException {
- super(null, null, size, null, spilledRecordsCounter);
+ super(null, null, size, null, reduceTask.spilledRecordsCounter);
this.kvIter = kvIter;
}
@@ -2621,9 +2647,9 @@ class ReduceTask extends Task {
}
private void addToMapOutputFilesOnDisk(CompressAwareFileStatus status) {
- synchronized (mapOutputFilesOnDisk) {
- mapOutputFilesOnDisk.add(status);
- mapOutputFilesOnDisk.notify();
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
+ reduceTask.mapOutputFilesOnDisk.add(status);
+ reduceTask.mapOutputFilesOnDisk.notify();
}
}
@@ -2647,11 +2673,11 @@ class ReduceTask extends Task {
try {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
while(!exitLocalFSMerge){
- synchronized (mapOutputFilesOnDisk) {
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
while (!exitLocalFSMerge &&
- mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
+ reduceTask.mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
- mapOutputFilesOnDisk.wait();
+ reduceTask.mapOutputFilesOnDisk.wait();
}
}
if(exitLocalFSMerge) {//to avoid running one extra time in the end
@@ -2662,15 +2688,15 @@ class ReduceTask extends Task {
int bytesPerSum =
reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
LOG.info(reduceTask.getTaskID() + "We have " +
- mapOutputFilesOnDisk.size() + " map outputs on disk. " +
+ reduceTask.mapOutputFilesOnDisk.size() + " map outputs on disk. " +
"Triggering merge of " + ioSortFactor + " files");
// 1. Prepare the list of files to be merged. This list is prepared
// using a list of map output files on disk. Currently we merge
// io.sort.factor files into 1.
- synchronized (mapOutputFilesOnDisk) {
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
for (int i = 0; i < ioSortFactor; ++i) {
- FileStatus filestatus = mapOutputFilesOnDisk.first();
- mapOutputFilesOnDisk.remove(filestatus);
+ FileStatus filestatus = reduceTask.mapOutputFilesOnDisk.first();
+ reduceTask.mapOutputFilesOnDisk.remove(filestatus);
mapFiles.add(filestatus.getPath());
approxOutputSize += filestatus.getLen();
}
@@ -2688,27 +2714,27 @@ class ReduceTask extends Task {
// 2. Start the on-disk merge process
Path outputPath =
- lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
- approxOutputSize, conf)
- .suffix(".merged");
- Writer writer =
- new Writer(conf,rfs, outputPath,
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(),
- codec, null);
- RawKeyValueIterator iter = null;
- Path tmpDir = new Path(reduceTask.getTaskID().toString());
- long decompressedBytesWritten;
- try {
- iter = Merger.merge(conf, rfs,
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(),
- codec, mapFiles.toArray(new Path[mapFiles.size()]),
- true, ioSortFactor, tmpDir,
- conf.getOutputKeyComparator(), reporter,
- spilledRecordsCounter, null);
-
- Merger.writeFile(iter, writer, reporter, conf);
+ reduceTask.lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
+ approxOutputSize, reduceTask.conf)
+ .suffix(".merged");
+ Writer writer =
+ new Writer(reduceTask.conf,rfs, outputPath,
+ reduceTask.conf.getMapOutputKeyClass(),
+ reduceTask.conf.getMapOutputValueClass(),
+ reduceTask.codec, null);
+ RawKeyValueIterator iter = null;
+ Path tmpDir = new Path(reduceTask.getTaskID().toString());
+ long decompressedBytesWritten;
+ try {
+ iter = Merger.merge(reduceTask.conf, rfs,
+ reduceTask.conf.getMapOutputKeyClass(),
+ reduceTask.conf.getMapOutputValueClass(),
+ reduceTask.codec, mapFiles.toArray(new Path[mapFiles.size()]),
+ true, ioSortFactor, tmpDir,
+ reduceTask.conf.getOutputKeyComparator(), reporter,
+ reduceTask.spilledRecordsCounter, null);
+
+ Merger.writeFile(iter, writer, reporter, reduceTask.conf);
writer.close();
decompressedBytesWritten = writer.decompressedBytesWritten;
} catch (Exception e) {
@@ -2716,7 +2742,7 @@ class ReduceTask extends Task {
throw new IOException (StringUtils.stringifyException(e));
}
- synchronized (mapOutputFilesOnDisk) {
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
FileStatus fileStatus = localFileSys.getFileStatus(outputPath);
CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
fileStatus, decompressedBytesWritten);
@@ -2738,9 +2764,9 @@ class ReduceTask extends Task {
mergeThrowable = e;
}
} catch (Throwable t) {
- String msg = getTaskID() + " : Failed to merge on the local FS"
+ String msg = reduceTask.getTaskID() + " : Failed to merge on the local FS"
+ StringUtils.stringifyException(t);
- reportFatalError(getTaskID(), t, msg);
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
}
}
}
@@ -2768,9 +2794,9 @@ class ReduceTask extends Task {
+ StringUtils.stringifyException(e));
ReduceCopier.this.mergeThrowable = e;
} catch (Throwable t) {
- String msg = getTaskID() + " : Failed to merge in memory"
+ String msg = reduceTask.getTaskID() + " : Failed to merge in memory"
+ StringUtils.stringifyException(t);
- reportFatalError(getTaskID(), t, msg);
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
}
}
@@ -2796,29 +2822,29 @@ class ReduceTask extends Task {
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
- mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
+ reduceTask.mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
Writer writer =
- new Writer(conf, rfs, outputPath,
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(),
- codec, null);
+ new Writer(reduceTask.conf, rfs, outputPath,
+ reduceTask.conf.getMapOutputKeyClass(),
+ reduceTask.conf.getMapOutputValueClass(),
+ reduceTask.codec, null);
long decompressedBytesWritten;
RawKeyValueIterator rIter = null;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
- rIter = Merger.merge(conf, rfs,
- (Class<K>)conf.getMapOutputKeyClass(),
- (Class<V>)conf.getMapOutputValueClass(),
+ rIter = Merger.merge(reduceTask.conf, rfs,
+ (Class<K>)reduceTask.conf.getMapOutputKeyClass(),
+ (Class<V>)reduceTask.conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
- conf.getOutputKeyComparator(), reporter,
- spilledRecordsCounter, null);
+ reduceTask.conf.getOutputKeyComparator(), reporter,
+ reduceTask.spilledRecordsCounter, null);
if (combinerRunner == null) {
- Merger.writeFile(rIter, writer, reporter, conf);
+ Merger.writeFile(rIter, writer, reporter, reduceTask.conf);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(rIter, combineCollector);
@@ -2843,7 +2869,7 @@ class ReduceTask extends Task {
FileStatus status = localFileSys.getFileStatus(outputPath);
CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
status, decompressedBytesWritten);
- synchronized (mapOutputFilesOnDisk) {
+ synchronized (reduceTask.mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(compressedFileStatus);
}
}
@@ -2891,7 +2917,7 @@ class ReduceTask extends Task {
String msg = reduceTask.getTaskID()
+ " GetMapEventsThread Ignoring exception : "
+ StringUtils.stringifyException(t);
- reportFatalError(getTaskID(), t, msg);
+ reduceTask.reportFatalError(reduceTask.getTaskID(), t, msg);
}
} while (!exitGetMapEvents);
@@ -2912,7 +2938,7 @@ class ReduceTask extends Task {
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
fromEventId.get(),
MAX_EVENTS_TO_FETCH,
- reduceTask.getTaskID(), jvmContext);
+ reduceTask.getTaskID(), reduceTask.jvmContext);
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
// Check if the reset is required.
@@ -2950,7 +2976,7 @@ class ReduceTask extends Task {
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
"/mapOutput?job=" + taskId.getJobID() +
"&map=" + taskId +
- "&reduce=" + getPartition());
+ "&reduce=" + reduceTask.getPartition());
List<MapOutputLocation> loc = mapLocations.get(host);
if (loc == null) {
loc = Collections.synchronizedList
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1506774&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java Wed Jul 24 23:11:17 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import java.io.IOException;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * ShuffleConsumerPlugin for serving Reducers. It may shuffle MOF files from
+ * either the built-in provider (MapOutputServlet) or from a 3rd party ShuffleProviderPlugin.
+ *
+ */
+@InterfaceAudience.LimitedPrivate("MapReduce")
+@InterfaceStability.Unstable
+public interface ShuffleConsumerPlugin {
+
+ /**
+ * initialize this instance after it was created by factory.
+ */
+ public void init(Context context) throws ClassNotFoundException, IOException;
+
+ /**
+ * fetch output of mappers from TaskTrackers
+ * @return true iff success. In case of failure an appropriate Throwable may be available thru getMergeThrowable() member
+ */
+ public boolean fetchOutputs() throws IOException;
+
+ /**
+ * @ret reference to a Throwable object (if merge throws an exception)
+ */
+ public Throwable getMergeThrowable();
+
+ /**
+ * Create a RawKeyValueIterator from copied map outputs.
+ *
+ * The iterator returned must satisfy the following constraints:
+ * 1. Fewer than io.sort.factor files may be sources
+ * 2. No more than maxInMemReduce bytes of map outputs may be resident
+ * in memory when the reduce begins
+ *
+ * If we must perform an intermediate merge to satisfy (1), then we can
+ * keep the excluded outputs from (2) in memory and include them in the
+ * first merge pass. If not, then said outputs must be written to disk
+ * first.
+ */
+ public RawKeyValueIterator createKVIterator(JobConf job, FileSystem fs, Reporter reporter) throws IOException;
+
+ /**
+ * close and clean any resource associated with this object.
+ */
+ public void close();
+
+ @InterfaceAudience.LimitedPrivate("MapReduce")
+ @InterfaceStability.Unstable
+ public static class Context {
+ private final ReduceTask reduceTask;
+ private final TaskUmbilicalProtocol umbilical;
+ private final JobConf conf;
+ private final TaskReporter reporter;
+
+ public Context(ReduceTask reduceTask, TaskUmbilicalProtocol umbilical, JobConf conf, TaskReporter reporter){
+ this.reduceTask = reduceTask;
+ this.umbilical = umbilical;
+ this.conf = conf;
+ this.reporter = reporter;
+ }
+
+ public ReduceTask getReduceTask() {
+ return reduceTask;
+ }
+ public JobConf getConf() {
+ return conf;
+ }
+ public TaskUmbilicalProtocol getUmbilical() {
+ return umbilical;
+ }
+ public TaskReporter getReporter() {
+ return reporter;
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java?rev=1506774&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/ShuffleProviderPlugin.java Wed Jul 24 23:11:17 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface is implemented by objects that are able to answer shuffle requests which are
+ * sent from a matching Shuffle Consumer that lives in context of a ReduceTask object.
+ *
+ * ShuffleProviderPlugin object will be notified on the following events:
+ * initialize, destroy.
+ *
+ * NOTE: This interface is also used when loading 3rd party plugins at runtime
+ *
+ */
+@InterfaceAudience.LimitedPrivate("MapReduce")
+@InterfaceStability.Unstable
+public interface ShuffleProviderPlugin {
+ /**
+ * Do constructor work here.
+ * This method is invoked by the TaskTracker Constructor
+ */
+ public void initialize(TaskTracker taskTracker);
+
+ /**
+ * close and cleanup any resource, including threads and disk space.
+ * This method is invoked by TaskTracker.shutdown
+ */
+ public void destroy();
+}
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Jul 24 23:11:17 2013
@@ -141,6 +141,9 @@ public class TaskTracker implements MRCo
static final long WAIT_FOR_DONE = 3 * 1000;
private int httpPort;
+ public static final String SHUFFLE_PROVIDER_PLUGIN_CLASSES = "mapreduce.shuffle.provider.plugin.classes";
+ final private ShuffleProviderPlugin shuffleProviderPlugin = new MultiShuffleProviderPlugin();
+
static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
static{
@@ -233,6 +236,52 @@ public class TaskTracker implements MRCo
}
}
+ public static class DefaultShuffleProvider implements ShuffleProviderPlugin {
+ public void initialize(TaskTracker tt) {
+ tt.server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+ }
+
+ public void destroy() {
+ }
+ }
+
+ private static class MultiShuffleProviderPlugin implements ShuffleProviderPlugin {
+
+ private ShuffleProviderPlugin[] plugins;
+
+ public void initialize(TaskTracker tt) {
+ Configuration conf = tt.getJobConf();
+ Class<?>[] klasses = conf.getClasses(SHUFFLE_PROVIDER_PLUGIN_CLASSES, DefaultShuffleProvider.class);
+
+ plugins = new ShuffleProviderPlugin[klasses.length];
+ for (int i = 0; i < klasses.length; i++) {
+ try{
+ LOG.info(" Loading ShuffleProviderPlugin: " + klasses[i]);
+ plugins[i] = (ShuffleProviderPlugin)ReflectionUtils.newInstance(klasses[i], conf);
+ plugins[i].initialize(tt);
+ }
+ catch(Throwable t) {
+ LOG.warn("Exception instantiating/initializing a ShuffleProviderPlugin: " + klasses[i], t);
+ plugins[i] = null;
+ }
+ }
+ }
+
+ public void destroy() {
+ if (plugins != null) {
+ for (ShuffleProviderPlugin plugin : plugins) {
+ try {
+ if (plugin != null) {
+ plugin.destroy();
+ }
+ } catch (Throwable t) {
+ LOG.warn("Exception destroying a ShuffleProviderPlugin: " + plugin, t);
+ }
+ }
+ }
+ }
+ }
+
private LocalStorage localStorage;
private long lastCheckDirsTime;
private int lastNumFailures;
@@ -697,7 +746,7 @@ public class TaskTracker implements MRCo
+ TaskTracker.LOCAL_SPLIT_FILE;
}
- static String getIntermediateOutputDir(String user, String jobid,
+ public static String getIntermediateOutputDir(String user, String jobid,
String taskid) {
return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+ TaskTracker.OUTPUT;
@@ -1433,6 +1482,14 @@ public class TaskTracker implements MRCo
public synchronized void shutdown() throws IOException, InterruptedException {
shuttingDown = true;
close();
+ if (this.shuffleProviderPlugin != null) {
+ try {
+ LOG.info("Shutting down shuffleProviderPlugin");
+ this.shuffleProviderPlugin.destroy();
+ } catch (Exception e) {
+ LOG.warn("Exception shutting down shuffleProviderPlugin", e);
+ }
+ }
if (this.server != null) {
try {
LOG.info("Shutting down StatusHttpServer");
@@ -1611,7 +1668,7 @@ public class TaskTracker implements MRCo
server.setAttribute("shuffleExceptionTracking", shuffleExceptionTracking);
- server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+ shuffleProviderPlugin.initialize(this);
server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
server.start();
this.httpPort = server.getPort();
@@ -3900,9 +3957,22 @@ public class TaskTracker implements MRCo
}
/**
+ * Get the specific job conf for a running job.
+ */
+ public JobConf getJobConf(JobID jobId) throws IOException {
+ synchronized (runningJobs) {
+ RunningJob rjob = runningJobs.get(jobId);
+ if (rjob == null) {
+ throw new IOException("Unknown job " + jobId + "!!");
+ }
+ return rjob.getJobConf();
+ }
+ }
+
+ /**
* Get the default job conf for this tracker.
*/
- JobConf getJobConf() {
+ public JobConf getJobConf() {
return fConf;
}
@@ -4038,16 +4108,10 @@ public class TaskTracker implements MRCo
FileSystem rfs = ((LocalFileSystem)
context.getAttribute("local.file.system")).getRaw();
- String userName = null;
- String runAsUserName = null;
- synchronized (tracker.runningJobs) {
- RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
- if (rjob == null) {
- throw new IOException("Unknown job " + jobId + "!!");
- }
- userName = rjob.jobConf.getUser();
- runAsUserName = tracker.getTaskController().getRunAsUser(rjob.jobConf);
- }
+ JobConf jobConf = tracker.getJobConf(JobID.forName(jobId));
+ String userName = jobConf.getUser();
+ String runAsUserName = tracker.getTaskController().getRunAsUser(jobConf);
+
// Index file
String intermediateOutputDir = TaskTracker.getIntermediateOutputDir(userName, jobId, mapId);
String indexKey = intermediateOutputDir + "/file.out.index";
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Wed Jul 24 23:11:17 2013
@@ -67,6 +67,9 @@ public class JobContext {
public static final String USER_LOG_RETAIN_HOURS =
"mapred.userlog.retain.hours";
+ public static final String SHUFFLE_CONSUMER_PLUGIN_ATTR =
+ "mapreduce.job.reduce.shuffle.consumer.plugin.class";
+
/**
* The UserGroupInformation object that has a reference to the current user
*/
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java?rev=1506774&r1=1506773&r2=1506774&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java Wed Jul 24 23:11:17 2013
@@ -39,10 +39,8 @@ public class TestReduceTaskFetchFail {
public String getJobFile() { return "/foo"; }
public class TestReduceCopier extends ReduceCopier {
- public TestReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
- TaskReporter reporter
- )throws ClassNotFoundException, IOException {
- super(umbilical, conf, reporter);
+ public void init(ShuffleConsumerPlugin.Context context)throws ClassNotFoundException, IOException {
+ super.init(context);
}
public void checkAndInformJobTracker(int failures, TaskAttemptID mapId, boolean readError) {
@@ -69,8 +67,10 @@ public class TestReduceTaskFetchFail {
TaskAttemptID tid = new TaskAttemptID();
TestReduceTask rTask = new TestReduceTask();
rTask.setConf(conf);
+ ShuffleConsumerPlugin.Context context = new ShuffleConsumerPlugin.Context(rTask, mockUmbilical, conf, mockTaskReporter);
- ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+ ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier();
+ reduceCopier.init(context);
reduceCopier.checkAndInformJobTracker(1, tid, false);
verify(mockTaskReporter, never()).progress();
@@ -82,7 +82,9 @@ public class TestReduceTaskFetchFail {
conf.setInt("mapreduce.reduce.shuffle.maxfetchfailures", 3);
rTask.setConf(conf);
- reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+ context = new ShuffleConsumerPlugin.Context(rTask, mockUmbilical, conf, mockTaskReporter);
+ reduceCopier = rTask.new TestReduceCopier();
+ reduceCopier.init(context);
reduceCopier.checkAndInformJobTracker(1, tid, false);
verify(mockTaskReporter, times(1)).progress();
@@ -103,7 +105,9 @@ public class TestReduceTaskFetchFail {
conf.setBoolean("mapreduce.reduce.shuffle.notify.readerror", false);
rTask.setConf(conf);
- reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+ context = new ShuffleConsumerPlugin.Context(rTask, mockUmbilical, conf, mockTaskReporter);
+ reduceCopier = rTask.new TestReduceCopier();
+ reduceCopier.init(context);
reduceCopier.checkAndInformJobTracker(7, tid, true);
verify(mockTaskReporter, times(4)).progress();
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java?rev=1506774&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestShufflePlugin.java Wed Jul 24 23:11:17 2013
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.fs.LocalFileSystem;
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.JobContext;
+
+/**
+ * A JUnit test for testing availability and accessibility of main API that is needed
+ * for sub-classes of ShuffleProviderPlugin and ShuffleConsumerPlugin.
+ * The importance of this test is for preserving API with 3rd party plugins.
+ */
+public class TestShufflePlugin {
+
+ static class TestShuffleConsumerPlugin implements ShuffleConsumerPlugin {
+ @Override
+ public void init(ShuffleConsumerPlugin.Context context) {
+ // just verify that Context has kept its public interface
+ context.getReduceTask();
+ context.getConf();
+ context.getUmbilical();
+ context.getReporter();
+ }
+ @Override
+ public boolean fetchOutputs() throws IOException{
+ return true;
+ }
+ @Override
+ public Throwable getMergeThrowable(){
+ return null;
+ }
+ @Override
+ public RawKeyValueIterator createKVIterator(JobConf job, FileSystem fs, Reporter reporter) throws IOException{
+ return null;
+ }
+ @Override
+ public void close(){
+ }
+ }
+
+ @Test
+ /**
+ * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
+ * as if it came from a 3rd party.
+ */
+ public void testConsumerPluginAbility() {
+
+ try{
+ // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
+ JobConf jobConf = new JobConf();
+ jobConf.setClass(JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR,
+ TestShufflePlugin.TestShuffleConsumerPlugin.class,
+ ShuffleConsumerPlugin.class);
+
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
+ Class<? extends ShuffleConsumerPlugin> clazz =
+ jobConf.getClass(JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, null, ShuffleConsumerPlugin.class);
+ assertNotNull("Unable to get " + JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, clazz);
+
+ // load 3rd party plugin through core's factory method
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
+ assertNotNull("Unable to load " + JobContext.SHUFFLE_CONSUMER_PLUGIN_ATTR, shuffleConsumerPlugin);
+ }
+ catch (Exception e) {
+ assertTrue("Threw exception:" + e, false);
+ }
+ }
+
+ static class TestShuffleProviderPlugin implements ShuffleProviderPlugin {
+ @Override
+ public void initialize(TaskTracker tt) {
+ }
+ @Override
+ public void destroy(){
+ }
+ }
+
+ @Test
+ /**
+ * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
+ * as if it came from a 3rd party.
+ */
+ public void testProviderPluginAbility() {
+
+ try{
+ // create JobConf with mapreduce.job.shuffle.provider.plugin=TestShuffleProviderPlugin
+ JobConf jobConf = new JobConf();
+ jobConf.setClass(TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES,
+ TestShufflePlugin.TestShuffleProviderPlugin.class,
+ ShuffleProviderPlugin.class);
+
+ ShuffleProviderPlugin shuffleProviderPlugin = null;
+ Class<? extends ShuffleProviderPlugin> clazz =
+ jobConf.getClass(TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES, null, ShuffleProviderPlugin.class);
+ assertNotNull("Unable to get " + TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES, clazz);
+
+ // load 3rd party plugin through core's factory method
+ shuffleProviderPlugin = ReflectionUtils.newInstance(clazz, jobConf);
+ assertNotNull("Unable to load " + TaskTracker.SHUFFLE_PROVIDER_PLUGIN_CLASSES, shuffleProviderPlugin);
+ }
+ catch (Exception e) {
+ assertTrue("Threw exception:" + e, false);
+ }
+ }
+
+ @Test
+ /**
+ * A method for testing availability and accessibility of API that is needed for sub-classes of ShuffleProviderPlugin
+ */
+ public void testProvider() {
+ //mock creation
+ ShuffleProviderPlugin mockShuffleProvider = mock(ShuffleProviderPlugin.class);
+ TaskTracker mockTT = mock(TaskTracker.class);
+ TaskController mockTaskController = mock(TaskController.class);
+
+ mockShuffleProvider.initialize(mockTT);
+ mockShuffleProvider.destroy();
+ try {
+ mockTT.getJobConf();
+ mockTT.getJobConf(mock(JobID.class));
+ mockTT.getIntermediateOutputDir("","","");
+ mockTT.getTaskController();
+ mockTaskController.getRunAsUser(mock(JobConf.class));
+ }
+ catch (Exception e){
+ assertTrue("Threw exception:" + e, false);
+ }
+ }
+
+ @Test
+ /**
+ * A method for testing availability and accessibility of API that is needed for sub-classes of ShuffleConsumerPlugin
+ */
+ public void testConsumer() {
+ //mock creation
+ ShuffleConsumerPlugin mockShuffleConsumer = mock(ShuffleConsumerPlugin.class);
+ ReduceTask mockReduceTask = mock(ReduceTask.class);
+ JobConf mockJobConf = mock(JobConf.class);
+ TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+ TaskReporter mockReporter = mock(TaskReporter.class);
+ LocalFileSystem mockLocalFileSystem = mock(LocalFileSystem.class);
+
+ mockReduceTask.getTaskID();
+ mockReduceTask.getJobID();
+ mockReduceTask.getNumMaps();
+ mockReduceTask.getPartition();
+ mockReduceTask.getJobFile();
+ mockReduceTask.getJvmContext();
+
+ mockReporter.progress();
+
+ try {
+ String [] dirs = mockJobConf.getLocalDirs();
+ ShuffleConsumerPlugin.Context context = new ShuffleConsumerPlugin.Context(mockReduceTask, mockUmbilical, mockJobConf, mockReporter);
+ mockShuffleConsumer.init(context);
+ mockShuffleConsumer.fetchOutputs();
+ mockShuffleConsumer.createKVIterator(mockJobConf, mockLocalFileSystem.getRaw(), mockReporter);
+ mockShuffleConsumer.close();
+ }
+ catch (Exception e){
+ assertTrue("Threw exception:" + e, false);
+ }
+ }
+}