You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2013/08/06 08:40:43 UTC
svn commit: r1510867 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/or...
Author: sandy
Date: Tue Aug 6 06:40:42 2013
New Revision: 1510867
URL: http://svn.apache.org/r1510867
Log:
MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and Aaron Kimball via Sandy Ryza)
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
- copied unchanged from r1510866, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Aug 6 06:40:42 2013
@@ -20,6 +20,9 @@ Release 2.3.0 - UNRELEASED
IMPROVEMENTS
+ MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
+ Aaron Kimball via Sandy Ryza)
+
OPTIMIZATIONS
MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Aug 6 06:40:42 2013
@@ -79,11 +79,15 @@ public class LocalJobRunner implements C
public static final String LOCAL_MAX_MAPS =
"mapreduce.local.map.tasks.maximum";
+ /** The maximum number of reduce tasks to run in parallel in LocalJobRunner */
+ public static final String LOCAL_MAX_REDUCES =
+ "mapreduce.local.reduce.tasks.maximum";
+
private FileSystem fs;
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
private JobConf conf;
private AtomicInteger map_tasks = new AtomicInteger(0);
- private int reduce_tasks = 0;
+ private AtomicInteger reduce_tasks = new AtomicInteger(0);
final Random rand = new Random();
private LocalJobRunnerMetrics myMetrics = null;
@@ -115,9 +119,11 @@ public class LocalJobRunner implements C
private JobConf job;
private int numMapTasks;
+ private int numReduceTasks;
private float [] partialMapProgress;
+ private float [] partialReduceProgress;
private Counters [] mapCounters;
- private Counters reduceCounters;
+ private Counters [] reduceCounters;
private JobStatus status;
private List<TaskAttemptID> mapIds = Collections.synchronizedList(
@@ -184,10 +190,14 @@ public class LocalJobRunner implements C
this.start();
}
+ protected abstract class RunnableWithThrowable implements Runnable {
+ public volatile Throwable storedException;
+ }
+
/**
* A Runnable instance that handles a map task to be run by an executor.
*/
- protected class MapTaskRunnable implements Runnable {
+ protected class MapTaskRunnable extends RunnableWithThrowable {
private final int taskId;
private final TaskSplitMetaInfo info;
private final JobID jobId;
@@ -198,8 +208,6 @@ public class LocalJobRunner implements C
// where to fetch mapper outputs.
private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
- public volatile Throwable storedException;
-
public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
this.info = info;
@@ -253,12 +261,13 @@ public class LocalJobRunner implements C
* @param mapOutputFiles a mapping from task attempts to output files
* @return a List of Runnables, one per map task.
*/
- protected List<MapTaskRunnable> getMapTaskRunnables(
+ protected List<RunnableWithThrowable> getMapTaskRunnables(
TaskSplitMetaInfo [] taskInfo, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
int numTasks = 0;
- ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+ ArrayList<RunnableWithThrowable> list =
+ new ArrayList<RunnableWithThrowable>();
for (TaskSplitMetaInfo task : taskInfo) {
list.add(new MapTaskRunnable(task, numTasks++, jobId,
mapOutputFiles));
@@ -267,12 +276,89 @@ public class LocalJobRunner implements C
return list;
}
+ protected class ReduceTaskRunnable extends RunnableWithThrowable {
+ private final int taskId;
+ private final JobID jobId;
+ private final JobConf localConf;
+
+ // This is a reference to a shared object passed in by the
+ // external context; this delivers state to the reducers regarding
+ // where to fetch mapper outputs.
+ private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
+
+ public ReduceTaskRunnable(int taskId, JobID jobId,
+ Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+ this.taskId = taskId;
+ this.jobId = jobId;
+ this.mapOutputFiles = mapOutputFiles;
+ this.localConf = new JobConf(job);
+ this.localConf.set("mapreduce.jobtracker.address", "local");
+ }
+
+ public void run() {
+ try {
+ TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
+ jobId, TaskType.REDUCE, taskId), 0);
+ LOG.info("Starting task: " + reduceId);
+
+ ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
+ reduceId, taskId, mapIds.size(), 1);
+ reduce.setUser(UserGroupInformation.getCurrentUser().
+ getShortUserName());
+ setupChildMapredLocalDirs(localJobDir, reduce, localConf);
+ reduce.setLocalMapFiles(mapOutputFiles);
+
+ if (!Job.this.isInterrupted()) {
+ reduce.setJobFile(localJobFile.toString());
+ localConf.setUser(reduce.getUser());
+ reduce.localizeConfiguration(localConf);
+ reduce.setConf(localConf);
+ try {
+ reduce_tasks.getAndIncrement();
+ myMetrics.launchReduce(reduce.getTaskID());
+ reduce.run(localConf, Job.this);
+ myMetrics.completeReduce(reduce.getTaskID());
+ } finally {
+ reduce_tasks.getAndDecrement();
+ }
+
+ LOG.info("Finishing task: " + reduceId);
+ } else {
+ throw new InterruptedException();
+ }
+ } catch (Throwable t) {
+ // store this to be rethrown in the initial thread context.
+ this.storedException = t;
+ }
+ }
+ }
+
+ /**
+ * Create Runnables to encapsulate reduce tasks for use by the executor
+ * service.
+ * @param jobId the job id
+ * @param mapOutputFiles a mapping from task attempts to output files
+ * @return a List of Runnables, one per reduce task.
+ */
+ protected List<RunnableWithThrowable> getReduceTaskRunnables(
+ JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+
+ int taskId = 0;
+ ArrayList<RunnableWithThrowable> list =
+ new ArrayList<RunnableWithThrowable>();
+ for (int i = 0; i < this.numReduceTasks; i++) {
+ list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles));
+ }
+
+ return list;
+ }
+
/**
* Initialize the counters that will hold partial-progress from
* the various task attempts.
* @param numMaps the number of map tasks in this job.
*/
- private synchronized void initCounters(int numMaps) {
+ private synchronized void initCounters(int numMaps, int numReduces) {
// Initialize state trackers for all map tasks.
this.partialMapProgress = new float[numMaps];
this.mapCounters = new Counters[numMaps];
@@ -280,16 +366,22 @@ public class LocalJobRunner implements C
this.mapCounters[i] = new Counters();
}
- this.reduceCounters = new Counters();
+ this.partialReduceProgress = new float[numReduces];
+ this.reduceCounters = new Counters[numReduces];
+ for (int i = 0; i < numReduces; i++) {
+ this.reduceCounters[i] = new Counters();
+ }
+
+ this.numMapTasks = numMaps;
+ this.numReduceTasks = numReduces;
}
/**
* Creates the executor service used to run map tasks.
*
- * @param numMapTasks the total number of map tasks to be run
* @return an ExecutorService instance that handles map tasks
*/
- protected ExecutorService createMapExecutor(int numMapTasks) {
+ protected synchronized ExecutorService createMapExecutor() {
// Determine the size of the thread pool to use
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
@@ -297,13 +389,10 @@ public class LocalJobRunner implements C
throw new IllegalArgumentException(
"Configured " + LOCAL_MAX_MAPS + " must be >= 1");
}
- this.numMapTasks = numMapTasks;
maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
- initCounters(this.numMapTasks);
-
- LOG.debug("Starting thread pool executor.");
+ LOG.debug("Starting mapper thread pool executor.");
LOG.debug("Max local threads: " + maxMapThreads);
LOG.debug("Map tasks to process: " + this.numMapTasks);
@@ -315,6 +404,65 @@ public class LocalJobRunner implements C
return executor;
}
+
+ /**
+ * Creates the executor service used to run reduce tasks.
+ *
+ * @return an ExecutorService instance that handles reduce tasks
+ */
+ protected synchronized ExecutorService createReduceExecutor() {
+
+ // Determine the size of the thread pool to use
+ int maxReduceThreads = job.getInt(LOCAL_MAX_REDUCES, 1);
+ if (maxReduceThreads < 1) {
+ throw new IllegalArgumentException(
+ "Configured " + LOCAL_MAX_REDUCES + " must be >= 1");
+ }
+ maxReduceThreads = Math.min(maxReduceThreads, this.numReduceTasks);
+ maxReduceThreads = Math.max(maxReduceThreads, 1); // In case of no tasks.
+
+ LOG.debug("Starting reduce thread pool executor.");
+ LOG.debug("Max local threads: " + maxReduceThreads);
+ LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
+
+ // Create a new executor service to drain the work queue.
+ ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
+
+ return executor;
+ }
+
+ /** Run a set of tasks and waits for them to complete. */
+ private void runTasks(List<RunnableWithThrowable> runnables,
+ ExecutorService service, String taskType) throws Exception {
+ // Start populating the executor with work units.
+ // They may begin running immediately (in other threads).
+ for (Runnable r : runnables) {
+ service.submit(r);
+ }
+
+ try {
+ service.shutdown(); // Instructs queue to drain.
+
+ // Wait for tasks to finish; do not use a time-based timeout.
+ // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+ LOG.info("Waiting for " + taskType + " tasks");
+ service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException ie) {
+ // Cancel all threads.
+ service.shutdownNow();
+ throw ie;
+ }
+
+ LOG.info(taskType + " task executor complete.");
+
+ // After waiting for the tasks to complete, if any of these
+ // have thrown an exception, rethrow it now in the main thread context.
+ for (RunnableWithThrowable r : runnables) {
+ if (r.storedException != null) {
+ throw new Exception(r.storedException);
+ }
+ }
+ }
private org.apache.hadoop.mapreduce.OutputCommitter
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
@@ -360,94 +508,25 @@ public class LocalJobRunner implements C
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
int numReduceTasks = job.getNumReduceTasks();
- if (numReduceTasks > 1 || numReduceTasks < 0) {
- // we only allow 0 or 1 reducer in local mode
- numReduceTasks = 1;
- job.setNumReduceTasks(1);
- }
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
-
- List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
- jobId, mapOutputFiles);
- ExecutorService mapService = createMapExecutor(taskRunnables.size());
-
- // Start populating the executor with work units.
- // They may begin running immediately (in other threads).
- for (Runnable r : taskRunnables) {
- mapService.submit(r);
- }
+
+ List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
+ taskSplitMetaInfos, jobId, mapOutputFiles);
+
+ initCounters(mapRunnables.size(), numReduceTasks);
+ ExecutorService mapService = createMapExecutor();
+ runTasks(mapRunnables, mapService, "map");
try {
- mapService.shutdown(); // Instructs queue to drain.
-
- // Wait for tasks to finish; do not use a time-based timeout.
- // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
- LOG.info("Waiting for map tasks");
- mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- } catch (InterruptedException ie) {
- // Cancel all threads.
- mapService.shutdownNow();
- throw ie;
- }
-
- LOG.info("Map task executor complete.");
-
- // After waiting for the map tasks to complete, if any of these
- // have thrown an exception, rethrow it now in the main thread context.
- for (MapTaskRunnable r : taskRunnables) {
- if (r.storedException != null) {
- throw new Exception(r.storedException);
- }
- }
-
- TaskAttemptID reduceId =
- new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
- try {
if (numReduceTasks > 0) {
- ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
- reduceId, 0, mapIds.size(), 1);
- reduce.setUser(UserGroupInformation.getCurrentUser().
- getShortUserName());
- JobConf localConf = new JobConf(job);
- localConf.set("mapreduce.jobtracker.address", "local");
- setupChildMapredLocalDirs(localJobDir, reduce, localConf);
- // move map output to reduce input
- for (int i = 0; i < mapIds.size(); i++) {
- if (!this.isInterrupted()) {
- TaskAttemptID mapId = mapIds.get(i);
- Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
- MapOutputFile localOutputFile = new MROutputFiles();
- localOutputFile.setConf(localConf);
- Path reduceIn =
- localOutputFile.getInputFileForWrite(mapId.getTaskID(),
- localFs.getFileStatus(mapOut).getLen());
- if (!localFs.mkdirs(reduceIn.getParent())) {
- throw new IOException("Mkdirs failed to create "
- + reduceIn.getParent().toString());
- }
- if (!localFs.rename(mapOut, reduceIn))
- throw new IOException("Couldn't rename " + mapOut);
- } else {
- throw new InterruptedException();
- }
- }
- if (!this.isInterrupted()) {
- reduce.setJobFile(localJobFile.toString());
- localConf.setUser(reduce.getUser());
- reduce.localizeConfiguration(localConf);
- reduce.setConf(localConf);
- reduce_tasks += 1;
- myMetrics.launchReduce(reduce.getTaskID());
- reduce.run(localConf, this);
- myMetrics.completeReduce(reduce.getTaskID());
- reduce_tasks -= 1;
- } else {
- throw new InterruptedException();
- }
+ List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
+ jobId, mapOutputFiles);
+ ExecutorService reduceService = createReduceExecutor();
+ runTasks(reduceRunnables, reduceService, "reduce");
}
} finally {
for (MapOutputFile output : mapOutputFiles.values()) {
@@ -465,7 +544,6 @@ public class LocalJobRunner implements C
}
JobEndNotifier.localRunnerNotification(job, status);
-
} catch (Throwable t) {
try {
outputCommitter.abortJob(jContext,
@@ -511,12 +589,13 @@ public class LocalJobRunner implements C
new ByteArrayInputStream(baos.toByteArray())));
LOG.info(taskStatus.getStateString());
- int taskIndex = mapIds.indexOf(taskId);
- if (taskIndex >= 0) { // mapping
+ int mapTaskIndex = mapIds.indexOf(taskId);
+ if (mapTaskIndex >= 0) {
+ // mapping
float numTasks = (float) this.numMapTasks;
- partialMapProgress[taskIndex] = taskStatus.getProgress();
- mapCounters[taskIndex] = taskStatus.getCounters();
+ partialMapProgress[mapTaskIndex] = taskStatus.getProgress();
+ mapCounters[mapTaskIndex] = taskStatus.getCounters();
float partialProgress = 0.0f;
for (float f : partialMapProgress) {
@@ -524,8 +603,18 @@ public class LocalJobRunner implements C
}
status.setMapProgress(partialProgress / numTasks);
} else {
- reduceCounters = taskStatus.getCounters();
- status.setReduceProgress(taskStatus.getProgress());
+ // reducing
+ int reduceTaskIndex = taskId.getTaskID().getId();
+ float numTasks = (float) this.numReduceTasks;
+
+ partialReduceProgress[reduceTaskIndex] = taskStatus.getProgress();
+ reduceCounters[reduceTaskIndex] = taskStatus.getCounters();
+
+ float partialProgress = 0.0f;
+ for (float f : partialReduceProgress) {
+ partialProgress += f;
+ }
+ status.setReduceProgress(partialProgress / numTasks);
}
// ignore phase
@@ -545,7 +634,13 @@ public class LocalJobRunner implements C
for (Counters c : mapCounters) {
current = Counters.sum(current, c);
}
- current = Counters.sum(current, reduceCounters);
+
+ if (null != reduceCounters && reduceCounters.length > 0) {
+ for (Counters c : reduceCounters) {
+ current = Counters.sum(current, c);
+ }
+ }
+
return current;
}
@@ -684,8 +779,9 @@ public class LocalJobRunner implements C
public ClusterMetrics getClusterMetrics() {
int numMapTasks = map_tasks.get();
- return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
- reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
+ int numReduceTasks = reduce_tasks.get();
+ return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
+ numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
public JobTrackerStatus getJobTrackerStatus() {
@@ -816,6 +912,27 @@ public class LocalJobRunner implements C
return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
}
+
+ /**
+ * Set the max number of reduce tasks to run concurrently in the LocalJobRunner.
+ * @param job the job to configure
+ * @param maxReduces the maximum number of reduce tasks to allow.
+ */
+ public static void setLocalMaxRunningReduces(
+ org.apache.hadoop.mapreduce.JobContext job,
+ int maxReduces) {
+ job.getConfiguration().setInt(LOCAL_MAX_REDUCES, maxReduces);
+ }
+
+ /**
+ * @return the max number of reduce tasks to run concurrently in the
+ * LocalJobRunner.
+ */
+ public static int getLocalMaxRunningReduces(
+ org.apache.hadoop.mapreduce.JobContext job) {
+ return job.getConfiguration().getInt(LOCAL_MAX_REDUCES, 1);
+ }
+
@Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
) throws IOException,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Tue Aug 6 06:40:42 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.JobCo
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
@@ -1860,7 +1861,6 @@ public class MapTask extends Task {
}
{
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
- Merger.considerFinalMergeForProgress();
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
@@ -1893,7 +1893,8 @@ public class MapTask extends Task {
segmentList, mergeFactor,
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter, sortSegments,
- null, spilledRecordsCounter, sortPhase.phase());
+ null, spilledRecordsCounter, sortPhase.phase(),
+ TaskType.MAP);
//write merged output to disk
long segmentStart = finalOut.getPos();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Tue Aug 6 06:40:42 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@@ -69,7 +70,8 @@ public class Merger {
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
- reporter, null).merge(keyClass, valueClass,
+ reporter, null,
+ TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
@@ -90,7 +92,8 @@ public class Merger {
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
- reporter, mergedMapOutputsCounter).merge(
+ reporter, mergedMapOutputsCounter,
+ TaskType.REDUCE).merge(
keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
@@ -124,7 +127,8 @@ public class Merger {
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
- sortSegments).merge(keyClass, valueClass,
+ sortSegments,
+ TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
@@ -140,10 +144,12 @@ public class Merger {
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
- Progress mergePhase)
+ Progress mergePhase,
+ TaskType taskType)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
- sortSegments, codec).merge(keyClass, valueClass,
+ sortSegments, codec,
+ taskType).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
@@ -161,7 +167,8 @@ public class Merger {
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
- sortSegments).merge(keyClass, valueClass,
+ sortSegments,
+ TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
@@ -182,7 +189,8 @@ public class Merger {
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
- sortSegments, codec).merge(keyClass, valueClass,
+ sortSegments, codec,
+ TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
@@ -366,20 +374,7 @@ public class Merger {
}
}
}
-
- // Boolean variable for including/considering final merge as part of sort
- // phase or not. This is true in map task, false in reduce task. It is
- // used in calculating mergeProgress.
- static boolean includeFinalMerge = false;
-
- /**
- * Sets the boolean variable includeFinalMerge to true. Called from
- * map task before calling merge() so that final merge of map task
- * is also considered as part of sort phase.
- */
- static void considerFinalMergeForProgress() {
- includeFinalMerge = true;
- }
+
private static class MergeQueue<K extends Object, V extends Object>
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
@@ -401,6 +396,21 @@ public class Merger {
final DataInputBuffer value = new DataInputBuffer();
final DataInputBuffer diskIFileValue = new DataInputBuffer();
+
+ // Boolean variable for including/considering final merge as part of sort
+ // phase or not. This is true in map task, false in reduce task. It is
+ // used in calculating mergeProgress.
+ private boolean includeFinalMerge = false;
+
+ /**
+ * Sets the boolean variable includeFinalMerge to true. Called from
+ * map task before calling merge() so that final merge of map task
+ * is also considered as part of sort phase.
+ */
+ private void considerFinalMergeForProgress() {
+ includeFinalMerge = true;
+ }
+
Segment<K, V> minSegment;
Comparator<Segment<K, V>> segmentComparator =
new Comparator<Segment<K, V>>() {
@@ -419,14 +429,16 @@ public class Merger {
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter)
throws IOException {
- this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null);
+ this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null,
+ TaskType.REDUCE);
}
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter,
- Counters.Counter mergedMapOutputsCounter)
+ Counters.Counter mergedMapOutputsCounter,
+ TaskType taskType)
throws IOException {
this.conf = conf;
this.fs = fs;
@@ -434,6 +446,10 @@ public class Merger {
this.comparator = comparator;
this.reporter = reporter;
+ if (taskType == TaskType.MAP) {
+ considerFinalMergeForProgress();
+ }
+
for (Path file : inputs) {
LOG.debug("MergeQ: adding: " + file);
segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs,
@@ -449,17 +465,20 @@ public class Merger {
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment<K, V>> segments, RawComparator<K> comparator,
Progressable reporter) {
- this(conf, fs, segments, comparator, reporter, false);
+ this(conf, fs, segments, comparator, reporter, false, TaskType.REDUCE);
}
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment<K, V>> segments, RawComparator<K> comparator,
- Progressable reporter, boolean sortSegments) {
+ Progressable reporter, boolean sortSegments, TaskType taskType) {
this.conf = conf;
this.fs = fs;
this.comparator = comparator;
this.segments = segments;
this.reporter = reporter;
+ if (taskType == TaskType.MAP) {
+ considerFinalMergeForProgress();
+ }
if (sortSegments) {
Collections.sort(segments, segmentComparator);
}
@@ -467,8 +486,10 @@ public class Merger {
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment<K, V>> segments, RawComparator<K> comparator,
- Progressable reporter, boolean sortSegments, CompressionCodec codec) {
- this(conf, fs, segments, comparator, reporter, sortSegments);
+ Progressable reporter, boolean sortSegments, CompressionCodec codec,
+ TaskType taskType) {
+ this(conf, fs, segments, comparator, reporter, sortSegments,
+ taskType);
this.codec = codec;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Tue Aug 6 06:40:42 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -74,6 +75,10 @@ public class ReduceTask extends Task {
private CompressionCodec codec;
+ // If this is a LocalJobRunner-based job, this will
+ // be a mapping from map task attempts to their output files.
+ // This will be null in other cases.
+ private Map<TaskAttemptID, MapOutputFile> localMapFiles;
{
getProgress().setStatus("reduce");
@@ -105,24 +110,24 @@ public class ReduceTask extends Task {
// file paths, the first parameter is considered smaller than the second one.
// In case of files with same size and path are considered equal.
private Comparator<FileStatus> mapOutputFileComparator =
- new Comparator<FileStatus>() {
- public int compare(FileStatus a, FileStatus b) {
- if (a.getLen() < b.getLen())
- return -1;
- else if (a.getLen() == b.getLen())
- if (a.getPath().toString().equals(b.getPath().toString()))
- return 0;
- else
- return -1;
+ new Comparator<FileStatus>() {
+ public int compare(FileStatus a, FileStatus b) {
+ if (a.getLen() < b.getLen())
+ return -1;
+ else if (a.getLen() == b.getLen())
+ if (a.getPath().toString().equals(b.getPath().toString()))
+ return 0;
else
- return 1;
- }
+ return -1;
+ else
+ return 1;
+ }
};
-
+
// A sorted set for keeping a set of map output files on disk
private final SortedSet<FileStatus> mapOutputFilesOnDisk =
- new TreeSet<FileStatus>(mapOutputFileComparator);
-
+ new TreeSet<FileStatus>(mapOutputFileComparator);
+
public ReduceTask() {
super();
}
@@ -133,6 +138,17 @@ public class ReduceTask extends Task {
this.numMaps = numMaps;
}
+
+ /**
+ * Register the set of mapper outputs created by a LocalJobRunner-based
+ * job with this ReduceTask so it knows where to fetch from.
+ *
+ * This should not be called in normal (networked) execution.
+ */
+ public void setLocalMapFiles(Map<TaskAttemptID, MapOutputFile> mapFiles) {
+ this.localMapFiles = mapFiles;
+ }
+
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
@@ -174,20 +190,11 @@ public class ReduceTask extends Task {
numMaps = in.readInt();
}
- // Get the input files for the reducer.
- private Path[] getMapFiles(FileSystem fs, boolean isLocal)
- throws IOException {
+ // Get the input files for the reducer (for local jobs).
+ private Path[] getMapFiles(FileSystem fs) throws IOException {
List<Path> fileList = new ArrayList<Path>();
- if (isLocal) {
- // for local jobs
- for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i));
- }
- } else {
- // for non local jobs
- for (FileStatus filestatus : mapOutputFilesOnDisk) {
- fileList.add(filestatus.getPath());
- }
+ for(int i = 0; i < numMaps; ++i) {
+ fileList.add(mapOutputFile.getInputFile(i));
}
return fileList.toArray(new Path[0]);
}
@@ -341,56 +348,33 @@ public class ReduceTask extends Task {
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
- ShuffleConsumerPlugin shuffleConsumerPlugin = null;
-
- boolean isLocal = false;
- // local if
- // 1) framework == local or
- // 2) framework == null and job tracker address == local
- String framework = job.get(MRConfig.FRAMEWORK_NAME);
- String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local");
- if ((framework == null && masterAddr.equals("local"))
- || (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) {
- isLocal = true;
- }
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
- if (!isLocal) {
- Class combinerClass = conf.getCombinerClass();
- CombineOutputCollector combineCollector =
- (null != combinerClass) ?
- new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
-
- Class<? extends ShuffleConsumerPlugin> clazz =
- job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
-
- shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
- LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
-
- ShuffleConsumerPlugin.Context shuffleContext =
- new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
- super.lDirAlloc, reporter, codec,
- combinerClass, combineCollector,
- spilledRecordsCounter, reduceCombineInputCounter,
- shuffledMapsCounter,
- reduceShuffleBytes, failedShuffleCounter,
- mergedMapOutputsCounter,
- taskStatus, copyPhase, sortPhase, this,
- mapOutputFile);
- shuffleConsumerPlugin.init(shuffleContext);
- rIter = shuffleConsumerPlugin.run();
- } else {
- // local job runner doesn't have a copy phase
- copyPhase.complete();
- final FileSystem rfs = FileSystem.getLocal(job).getRaw();
- rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), codec,
- getMapFiles(rfs, true),
- !conf.getKeepFailedTaskFiles(),
- job.getInt(JobContext.IO_SORT_FACTOR, 100),
- new Path(getTaskID().toString()),
- job.getOutputKeyComparator(),
- reporter, spilledRecordsCounter, null, null);
- }
+ Class combinerClass = conf.getCombinerClass();
+ CombineOutputCollector combineCollector =
+ (null != combinerClass) ?
+ new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
+
+ Class<? extends ShuffleConsumerPlugin> clazz =
+ job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
+
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+ LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
+
+ ShuffleConsumerPlugin.Context shuffleContext =
+ new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
+ super.lDirAlloc, reporter, codec,
+ combinerClass, combineCollector,
+ spilledRecordsCounter, reduceCombineInputCounter,
+ shuffledMapsCounter,
+ reduceShuffleBytes, failedShuffleCounter,
+ mergedMapOutputsCounter,
+ taskStatus, copyPhase, sortPhase, this,
+ mapOutputFile, localMapFiles);
+ shuffleConsumerPlugin.init(shuffleContext);
+
+ rIter = shuffleConsumerPlugin.run();
+
// free up the data structures
mapOutputFilesOnDisk.clear();
@@ -409,9 +393,7 @@ public class ReduceTask extends Task {
keyClass, valueClass);
}
- if (shuffleConsumerPlugin != null) {
- shuffleConsumerPlugin.close();
- }
+ shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java Tue Aug 6 06:40:42 2013
@@ -19,6 +19,8 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.util.Map;
+
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -65,6 +67,7 @@ public interface ShuffleConsumerPlugin<K
private final Progress mergePhase;
private final Task reduceTask;
private final MapOutputFile mapOutputFile;
+ private final Map<TaskAttemptID, MapOutputFile> localMapFiles;
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
JobConf jobConf, FileSystem localFS,
@@ -80,7 +83,8 @@ public interface ShuffleConsumerPlugin<K
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status, Progress copyPhase, Progress mergePhase,
- Task reduceTask, MapOutputFile mapOutputFile) {
+ Task reduceTask, MapOutputFile mapOutputFile,
+ Map<TaskAttemptID, MapOutputFile> localMapFiles) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localFS = localFS;
@@ -101,6 +105,7 @@ public interface ShuffleConsumerPlugin<K
this.mergePhase = mergePhase;
this.reduceTask = reduceTask;
this.mapOutputFile = mapOutputFile;
+ this.localMapFiles = localMapFiles;
}
public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
@@ -163,6 +168,9 @@ public interface ShuffleConsumerPlugin<K
public MapOutputFile getMapOutputFile() {
return mapOutputFile;
}
+ public Map<TaskAttemptID, MapOutputFile> getLocalMapFiles() {
+ return localMapFiles;
+ }
} // end of public static class Context<K,V>
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Aug 6 06:40:42 2013
@@ -60,7 +60,7 @@ class Fetcher<K,V> extends Thread {
/* Default read timeout (in milliseconds) */
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
- private final Reporter reporter;
+ protected final Reporter reporter;
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
@@ -71,13 +71,13 @@ class Fetcher<K,V> extends Thread {
private final Counters.Counter badIdErrs;
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
- private final MergeManager<K,V> merger;
- private final ShuffleSchedulerImpl<K,V> scheduler;
- private final ShuffleClientMetrics metrics;
- private final ExceptionReporter exceptionReporter;
- private final int id;
+ protected final MergeManager<K,V> merger;
+ protected final ShuffleSchedulerImpl<K,V> scheduler;
+ protected final ShuffleClientMetrics metrics;
+ protected final ExceptionReporter exceptionReporter;
+ protected final int id;
private static int nextId = 0;
- private final int reduce;
+ protected final int reduce;
private final int connectionTimeout;
private final int readTimeout;
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Aug 6 06:40:42 2013
@@ -18,10 +18,12 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
@@ -56,6 +58,7 @@ public class Shuffle<K, V> implements Sh
private Progress copyPhase;
private TaskStatus taskStatus;
private Task reduceTask; //Used for status updates
+ private Map<TaskAttemptID, MapOutputFile> localMapFiles;
@Override
public void init(ShuffleConsumerPlugin.Context context) {
@@ -69,6 +72,7 @@ public class Shuffle<K, V> implements Sh
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
+ this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
@@ -103,13 +107,22 @@ public class Shuffle<K, V> implements Sh
eventFetcher.start();
// Start the map-output fetcher threads
- final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
+ boolean isLocal = localMapFiles != null;
+ final int numFetchers = isLocal ? 1 :
+ jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
- for (int i=0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
- reporter, metrics, this,
- reduceTask.getShuffleSecret());
- fetchers[i].start();
+ if (isLocal) {
+ fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
+ merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
+ localMapFiles);
+ fetchers[0].start();
+ } else {
+ for (int i=0; i < numFetchers; ++i) {
+ fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
+ reporter, metrics, this,
+ reduceTask.getShuffleSecret());
+ fetchers[i].start();
+ }
}
// Wait for shuffle to complete successfully
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Tue Aug 6 06:40:42 2013
@@ -155,7 +155,7 @@ public class TestShufflePlugin<K, V> {
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
- mockTask, mockMapOutputFile);
+ mockTask, mockMapOutputFile, null);
shuffleConsumerPlugin.init(context);
shuffleConsumerPlugin.run();
shuffleConsumerPlugin.close();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java Tue Aug 6 06:40:42 2013
@@ -276,18 +276,16 @@ public class TestJobCounters {
// there are too few spills to combine (2 < 3)
// Each map spills 2^14 records, so maps spill 49152 records, combined.
- // The reduce spill count is composed of the read from one segment and
- // the intermediate merge of the other two. The intermediate merge
+ // The combiner has emitted 24576 records to the reducer; these are all
+ // fetched straight to memory from the map side. The intermediate merge
// adds 8192 records per segment read; again, there are too few spills to
- // combine, so all 16834 are written to disk (total 32768 spilled records
- // for the intermediate merge). The merge into the reduce includes only
- // the unmerged segment, size 8192. Total spilled records in the reduce
- // is 32768 from the merge + 8192 unmerged segment = 40960 records
+ // combine, so all Total spilled records in the reduce
+ // is 8192 records / map * 3 maps = 24576.
- // Total: map + reduce = 49152 + 40960 = 90112
+ // Total: map + reduce = 49152 + 24576 = 73728
// 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 61440 output records
- validateCounters(c1, 90112, 15360, 61440);
+ validateCounters(c1, 73728, 15360, 61440);
validateFileCounters(c1, inputSize, 0, 0, 0);
validateOldFileCounters(c1, inputSize, 61928, 0, 0);
}
@@ -316,12 +314,12 @@ public class TestJobCounters {
// 1st merge: read + write = 8192 * 4
// 2nd merge: read + write = 8192 * 4
// final merge: 0
- // Total reduce: 65536
+ // Total reduce: 32768
- // Total: map + reduce = 2^16 + 2^16 = 131072
+ // Total: map + reduce = 2^16 + 2^15 = 98304
// 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 81920 output records
- validateCounters(c1, 131072, 20480, 81920);
+ validateCounters(c1, 98304, 20480, 81920);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -349,7 +347,7 @@ public class TestJobCounters {
// Total reduce: 45056
// 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 102400 output records
- validateCounters(c1, 147456, 25600, 102400);
+ validateCounters(c1, 122880, 25600, 102400);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -394,7 +392,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN0"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
- validateCounters(c1, 90112, 15360, 61440);
+ validateCounters(c1, 73728, 15360, 61440);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -416,7 +414,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN1"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
- validateCounters(c1, 131072, 20480, 81920);
+ validateCounters(c1, 98304, 20480, 81920);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -439,7 +437,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN2"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
- validateCounters(c1, 147456, 25600, 102400);
+ validateCounters(c1, 122880, 25600, 102400);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Tue Aug 6 06:40:42 2013
@@ -63,7 +63,7 @@ public class TestKeyFieldBasedComparator
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
- conf.setNumReduceTasks(2);
+ conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
@@ -101,9 +101,7 @@ public class TestKeyFieldBasedComparator
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
- //that we have two lines (both the lines must end up in the same
- //reducer since the partitioner takes the same key spec for all
- //lines
+ //that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java Tue Aug 6 06:40:42 2013
@@ -31,9 +31,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -410,6 +410,7 @@ public class TestLocalRunner extends Tes
}
/** Test case for zero mappers */
+ @Test
public void testEmptyMaps() throws Exception {
Job job = Job.getInstance();
Path outputPath = getOutputPath();
@@ -428,5 +429,145 @@ public class TestLocalRunner extends Tes
boolean success = job.waitForCompletion(true);
assertTrue("Empty job should work", success);
}
+
+ /** @return the directory where numberfiles are written (mapper inputs) */
+ private Path getNumberDirPath() {
+ return new Path(getInputPath(), "numberfiles");
+ }
+
+ /**
+ * Write out an input file containing an integer.
+ *
+ * @param fileNum the file number to write to.
+ * @param value the value to write to the file
+ * @return the path of the written file.
+ */
+ private Path makeNumberFile(int fileNum, int value) throws IOException {
+ Path workDir = getNumberDirPath();
+ Path filePath = new Path(workDir, "file" + fileNum);
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ OutputStream os = fs.create(filePath);
+ BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ w.write("" + value);
+ w.close();
+
+ return filePath;
+ }
+
+ /**
+ * Each record received by this mapper is a number 'n'.
+ * Emit the values [0..n-1]
+ */
+ public static class SequenceMapper
+ extends Mapper<LongWritable, Text, Text, NullWritable> {
+
+ public void map(LongWritable k, Text v, Context c)
+ throws IOException, InterruptedException {
+ int max = Integer.valueOf(v.toString());
+ for (int i = 0; i < max; i++) {
+ c.write(new Text("" + i), NullWritable.get());
+ }
+ }
+ }
+
+ private final static int NUMBER_FILE_VAL = 100;
+
+ /**
+ * Tally up the values and ensure that we got as much data
+ * out as we put in.
+ * Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1).
+ * Verify that across all our reducers we got exactly this much
+ * data back.
+ */
+ private void verifyNumberJob(int numMaps) throws Exception {
+ Path outputDir = getOutputPath();
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ FileStatus [] stats = fs.listStatus(outputDir);
+ int valueSum = 0;
+ for (FileStatus f : stats) {
+ FSDataInputStream istream = fs.open(f.getPath());
+ BufferedReader r = new BufferedReader(new InputStreamReader(istream));
+ String line = null;
+ while ((line = r.readLine()) != null) {
+ valueSum += Integer.valueOf(line.trim());
+ }
+ r.close();
+ }
+
+ int maxVal = NUMBER_FILE_VAL - 1;
+ int expectedPerMapper = maxVal * (maxVal + 1) / 2;
+ int expectedSum = expectedPerMapper * numMaps;
+ LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
+ assertEquals("Didn't get all our results back", expectedSum, valueSum);
+ }
+
+ /**
+ * Run a test which creates a SequenceMapper / IdentityReducer
+ * job over a set of generated number files.
+ */
+ private void doMultiReducerTest(int numMaps, int numReduces,
+ int parallelMaps, int parallelReduces) throws Exception {
+
+ Path in = getNumberDirPath();
+ Path out = getOutputPath();
+
+ // Clear data from any previous tests.
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (fs.exists(out)) {
+ fs.delete(out, true);
+ }
+
+ if (fs.exists(in)) {
+ fs.delete(in, true);
+ }
+
+ for (int i = 0; i < numMaps; i++) {
+ makeNumberFile(i, 100);
+ }
+
+ Job job = Job.getInstance();
+ job.setNumReduceTasks(numReduces);
+
+ job.setMapperClass(SequenceMapper.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+ FileInputFormat.addInputPath(job, in);
+ FileOutputFormat.setOutputPath(job, out);
+
+ LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
+ LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
+
+ boolean result = job.waitForCompletion(true);
+ assertTrue("Job failed!!", result);
+
+ verifyNumberJob(numMaps);
+ }
+
+ @Test
+ public void testOneMapMultiReduce() throws Exception {
+ doMultiReducerTest(1, 2, 1, 1);
+ }
+
+ @Test
+ public void testOneMapMultiParallelReduce() throws Exception {
+ doMultiReducerTest(1, 2, 1, 2);
+ }
+
+ @Test
+ public void testMultiMapOneReduce() throws Exception {
+ doMultiReducerTest(4, 1, 2, 1);
+ }
+
+ @Test
+ public void testMultiMapMultiReduce() throws Exception {
+ doMultiReducerTest(4, 4, 2, 2);
+ }
+
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java?rev=1510867&r1=1510866&r2=1510867&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java Tue Aug 6 06:40:42 2013
@@ -56,7 +56,7 @@ public class TestMRKeyFieldBasedComparat
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
- Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
+ Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
line1 +"\n" + line2 + "\n");
job.setMapperClass(InverseMapper.class);
job.setReducerClass(Reducer.class);