You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/15 21:13:34 UTC
svn commit: r406718 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Mon May 15 12:13:33 2006
New Revision: 406718
URL: http://svn.apache.org/viewcvs?rev=406718&view=rev
Log:
HADOOP-200. Avoid transmitting entire list of map task names to reduce tasks. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 15 12:13:33 2006
@@ -21,6 +21,11 @@
and miss his heartbeat window. This was killing many task trackers as
big jobs finished (300+ tasks / node). (omalley via cutting)
+ 6. HADOOP-200. Avoid transmitting entire list of map task names to
+ reduce tasks. Instead just transmit the number of map tasks and
+ henceforth refer to them by number when collecting map output.
+ (omalley via cutting)
+
Release 0.2.1 - 2006-05-12
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon May 15 12:13:33 2006
@@ -48,12 +48,14 @@
/** Called by a reduce task to find which map tasks are completed.
*
- * @param taskId the reduce task id
- * @param mapTasksNeeded an array of UTF8 naming map task ids whose output is needed.
+ * @param jobId the job id
+ * @param mapTasksNeeded an array of the mapIds that we need
+ * @param partition the reduce's id
* @return an array of MapOutputLocation
*/
- MapOutputLocation[] locateMapOutputs(String taskId,
- String[][] mapTasksNeeded
+ MapOutputLocation[] locateMapOutputs(String jobId,
+ int[] mapTasksNeeded,
+ int partition
) throws IOException;
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon May 15 12:13:33 2006
@@ -151,7 +151,8 @@
//
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
- reduces[i] = new TaskInProgress(uniqueString, jobFile, maps, i,
+ reduces[i] = new TaskInProgress(uniqueString, jobFile,
+ numMapTasks, i,
jobtracker, conf, this);
}
@@ -582,16 +583,34 @@
* Return the TaskInProgress that matches the tipid.
*/
public TaskInProgress getTaskInProgress(String tipid){
- for (int i = 0; i < maps.length; i++) {
- if (tipid.equals(maps[i].getTIPId())){
- return maps[i];
- }
- }
- for (int i = 0; i < reduces.length; i++) {
- if (tipid.equals(reduces[i].getTIPId())){
- return reduces[i];
- }
- }
- return null;
+ for (int i = 0; i < maps.length; i++) {
+ if (tipid.equals(maps[i].getTIPId())){
+ return maps[i];
+ }
+ }
+ for (int i = 0; i < reduces.length; i++) {
+ if (tipid.equals(reduces[i].getTIPId())){
+ return reduces[i];
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find the details of someplace where a map has finished
+ * @param mapId the id of the map
+ * @return the task status of the completed task
+ */
+ public TaskStatus findFinishedMap(int mapId) {
+ TaskInProgress tip = maps[mapId];
+ if (tip.isComplete()) {
+ TaskStatus[] statuses = tip.getTaskStatuses();
+ for(int i=0; i < statuses.length; i++) {
+ if (statuses[i].getRunState() == TaskStatus.SUCCEEDED) {
+ return statuses[i];
+ }
+ }
+ }
+ return null;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon May 15 12:13:33 2006
@@ -783,26 +783,27 @@
* yet closed, tasks. This exists so the reduce task thread can locate
* map task outputs.
*/
- public synchronized MapOutputLocation[] locateMapOutputs(String taskId, String[][] mapTasksNeeded) {
- ArrayList v = new ArrayList();
+ public synchronized MapOutputLocation[]
+ locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {
+ ArrayList result = new ArrayList(mapTasksNeeded.length);
+ JobInProgress job = getJob(jobId);
for (int i = 0; i < mapTasksNeeded.length; i++) {
- for (int j = 0; j < mapTasksNeeded[i].length; j++) {
- TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(mapTasksNeeded[i][j]);
- if (tip != null && tip.isComplete(mapTasksNeeded[i][j])) {
- String trackerId = (String) taskidToTrackerMap.get(mapTasksNeeded[i][j]);
- TaskTrackerStatus tracker;
- synchronized (taskTrackers) {
- tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
- }
- v.add(new MapOutputLocation(mapTasksNeeded[i][j], tracker.getHost(), tracker.getPort()));
- break;
- }
- }
+ TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
+ if (status != null) {
+ String trackerId =
+ (String) taskidToTrackerMap.get(status.getTaskId());
+ TaskTrackerStatus tracker;
+ synchronized (taskTrackers) {
+ tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
+ }
+ result.add(new MapOutputLocation(status.getTaskId(),
+ mapTasksNeeded[i],
+ tracker.getHost(),
+ tracker.getPort()));
+ }
}
- // randomly shuffle results to load-balance map output requests
- Collections.shuffle(v);
-
- return (MapOutputLocation[]) v.toArray(new MapOutputLocation[v.size()]);
+ return (MapOutputLocation[])
+ result.toArray(new MapOutputLocation[result.size()]);
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon May 15 12:13:33 2006
@@ -96,7 +96,7 @@
for (int i = 0; i < mapIds.size(); i++) {
String mapId = (String)mapIds.get(i);
Path mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
- Path reduceIn = this.mapoutputFile.getInputFile(mapId, reduceId);
+ Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
localFs.mkdirs(reduceIn.getParent());
if (!localFs.rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut);
@@ -104,11 +104,8 @@
}
// run a single reduce task
- String mapDependencies[][] = new String[mapIds.size()][1];
- for (int i = 0; i < mapIds.size(); i++) {
- mapDependencies[i][0] = (String) mapIds.get(i);
- }
- ReduceTask reduce = new ReduceTask(file, reduceId, mapDependencies,0);
+ ReduceTask reduce = new ReduceTask(profile.getJobId(), file,
+ reduceId, mapIds.size(),0);
reduce.setConf(job);
reduce_tasks += 1;
reduce.run(job, this);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon May 15 12:13:33 2006
@@ -39,6 +39,7 @@
private String mapTaskId;
private String reduceTaskId;
+ private int mapId;
private int partition;
/** Permits reporting of file copy progress. */
@@ -66,18 +67,10 @@
* @param mapTaskId a map task id
* @param reduceTaskId a reduce task id
*/
- public Path getInputFile(String mapTaskId, String reduceTaskId)
+ public Path getInputFile(int mapId, String reduceTaskId)
throws IOException {
- return this.jobConf.getLocalPath(reduceTaskId+"/"+mapTaskId+".out");
- }
- public Path getInputFile(String mapTaskIds[], String reduceTaskId)
- throws IOException {
- for (int i = 0; i < mapTaskIds.length; i++) {
- Path file = jobConf.getLocalPath(reduceTaskId+"/"+mapTaskIds[i]+".out");
- if (getLocalFs().exists(file))
- return file;
- }
- throw new IOException("Input file not found!");
+ // TODO *oom* should use a format here
+ return this.jobConf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
}
/** Removes all of the files related to a task. */
@@ -97,9 +90,11 @@
public MapOutputFile() {
}
- public MapOutputFile(String mapTaskId, String reduceTaskId, int partition) {
+ public MapOutputFile(String mapTaskId, String reduceTaskId,
+ int mapId, int partition) {
this.mapTaskId = mapTaskId;
this.reduceTaskId = reduceTaskId;
+ this.mapId = mapId;
this.partition = partition;
}
@@ -110,6 +105,7 @@
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, mapTaskId);
UTF8.writeString(out, reduceTaskId);
+ out.writeInt(mapId);
out.writeInt(partition);
Path file = getOutputFile(mapTaskId, partition);
@@ -145,12 +141,13 @@
public void readFields(DataInput in) throws IOException {
this.mapTaskId = UTF8.readString(in);
this.reduceTaskId = UTF8.readString(in);
+ this.mapId = in.readInt();
this.partition = in.readInt();
ProgressReporter reporter = (ProgressReporter)REPORTERS.get();
// read the length-prefixed file content into a local file
- Path file = getInputFile(mapTaskId, reduceTaskId);
+ Path file = getInputFile(mapId, reduceTaskId);
long length = in.readLong();
float progPerByte = 1.0f / length;
long unread = length;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Mon May 15 12:13:33 2006
@@ -34,6 +34,7 @@
}
private String mapTaskId;
+ private int mapId;
private String host;
private int port;
@@ -42,14 +43,24 @@
}
/** Construct a location. */
- public MapOutputLocation(String mapTaskId, String host, int port) {
+ public MapOutputLocation(String mapTaskId, int mapId,
+ String host, int port) {
this.mapTaskId = mapTaskId;
+ this.mapId = mapId;
this.host = host;
this.port = port;
}
/** The map task id. */
public String getMapTaskId() { return mapTaskId; }
+
+ /**
+ * Get the map's id number.
+ * @return The numeric id for this map
+ */
+ public int getMapId() {
+ return mapId;
+ }
/** The host the task completed on. */
public String getHost() { return host; }
@@ -59,12 +70,14 @@
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, mapTaskId);
+ out.writeInt(mapId);
UTF8.writeString(out, host);
out.writeInt(port);
}
public void readFields(DataInput in) throws IOException {
this.mapTaskId = UTF8.readString(in);
+ this.mapId = in.readInt();
this.host = UTF8.readString(in);
this.port = in.readInt();
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java Mon May 15 12:13:33 2006
@@ -26,6 +26,6 @@
/** Returns the output from the named map task destined for this partition.*/
MapOutputFile getFile(String mapTaskId, String reduceTaskId,
- IntWritable partition) throws IOException;
+ int mapId, int partition) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May 15 12:13:33 2006
@@ -36,7 +36,8 @@
});
}
- private String[][] mapTaskIds;
+ private UTF8 jobId = new UTF8();
+ private int numMaps;
private int partition;
private boolean sortComplete;
@@ -51,10 +52,11 @@
public ReduceTask() {}
- public ReduceTask(String jobFile, String taskId,
- String[][] mapTaskIds, int partition) {
+ public ReduceTask(String jobId, String jobFile, String taskId,
+ int numMaps, int partition) {
super(jobFile, taskId);
- this.mapTaskIds = mapTaskIds;
+ this.jobId.set(jobId);
+ this.numMaps = numMaps;
this.partition = partition;
}
@@ -66,34 +68,30 @@
return false;
}
- public String[][] getMapTaskIds() { return mapTaskIds; }
+ /**
+ * Get the job name for this task.
+ * @return the job name
+ */
+ public UTF8 getJobId() {
+ return jobId;
+ }
+
+ public int getNumMaps() { return numMaps; }
public int getPartition() { return partition; }
public void write(DataOutput out) throws IOException {
super.write(out);
- out.writeInt(mapTaskIds.length); // write mapTaskIds
- for (int i = 0; i < mapTaskIds.length; i++) {
- out.writeInt(mapTaskIds[i].length);
- for (int j = 0; j < mapTaskIds[i].length; j++) {
- UTF8.writeString(out, mapTaskIds[i][j]);
- }
- }
-
+ jobId.write(out);
+ out.writeInt(numMaps); // write the number of maps
out.writeInt(partition); // write partition
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- mapTaskIds = new String[in.readInt()][]; // read mapTaskIds
- for (int i = 0; i < mapTaskIds.length; i++) {
- mapTaskIds[i] = new String[in.readInt()];
- for (int j = 0; j < mapTaskIds[i].length; j++) {
- mapTaskIds[i][j] = UTF8.readString(in);
- }
- }
-
+ jobId.readFields(in);
+ numMaps = in.readInt();
this.partition = in.readInt(); // read partition
}
@@ -189,15 +187,15 @@
new SequenceFile.Writer(lfs, file, keyClass, valueClass);
try {
// append all input files into a single input file
- for (int i = 0; i < mapTaskIds.length; i++) {
+ for (int i = 0; i < numMaps; i++) {
appendPhase.addPhase(); // one per file
}
DataOutputBuffer buffer = new DataOutputBuffer();
- for (int i = 0; i < mapTaskIds.length; i++) {
+ for (int i = 0; i < numMaps; i++) {
Path partFile =
- this.mapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
+ this.mapOutputFile.getInputFile(i, getTaskId());
float progPerByte = 1.0f / lfs.getLength(partFile);
Progress phase = appendPhase.phase();
phase.setStatus(partFile.toString());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon May 15 12:13:33 2006
@@ -15,9 +15,7 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import java.io.*;
@@ -27,8 +25,6 @@
/** Runs a reduce task. */
class ReduceTaskRunner extends TaskRunner {
- private static final Logger LOG =
- LogFormatter.getLogger("org.apache.hadoop.mapred.ReduceTaskRunner");
private MapOutputFile mapOutputFile;
public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
@@ -41,13 +37,13 @@
public boolean prepare() throws IOException {
ReduceTask task = ((ReduceTask)getTask());
this.mapOutputFile.removeAll(task.getTaskId()); // cleanup from failures
- String[][] mapTaskIds = task.getMapTaskIds();
+ int numMaps = task.getNumMaps();
final Progress copyPhase = getTask().getProgress().phase();
// we need input from every map task
- Vector needed = new Vector();
- for (int i = 0; i < mapTaskIds.length; i++) {
- needed.add(mapTaskIds[i]);
+ List needed = new ArrayList(numMaps);
+ for (int i = 0; i < numMaps; i++) {
+ needed.add(new Integer(i));
copyPhase.addPhase(); // add sub-phase per file
}
@@ -59,15 +55,17 @@
// query for a just a random subset of needed segments so that we don't
// overwhelm jobtracker. ideally perhaps we could send a more compact
// representation of all needed, i.e., a bit-vector
- Collections.shuffle(needed);
int checkSize = Math.min(10, needed.size());
- String[][] neededStrings = new String[checkSize][];
+ int[] neededIds = new int[checkSize];
+ Collections.shuffle(needed);
+ ListIterator itr = needed.listIterator();
for (int i = 0; i < checkSize; i++) {
- neededStrings[i] = (String[]) needed.elementAt(i);
+ neededIds[i] = ((Integer) itr.next()).intValue();
}
MapOutputLocation[] locs = null;
try {
- locs = jobClient.locateMapOutputs(task.getTaskId(), neededStrings);
+ locs = jobClient.locateMapOutputs(task.getJobId().toString(),
+ neededIds, task.getPartition());
} catch (IOException ie) {
LOG.info("Problem locating map outputs: " +
StringUtils.stringifyException(ie));
@@ -112,18 +110,15 @@
LOG.info(task.getTaskId()+" Copying "+loc.getMapTaskId()
+" output from "+loc.getHost()+".");
client.getFile(loc.getMapTaskId(), task.getTaskId(),
- new IntWritable(task.getPartition()));
+ loc.getMapId(),
+ task.getPartition());
// Success: remove from 'needed'
- boolean foundit = false;
- for (Iterator it = needed.iterator(); it.hasNext() && !foundit; ) {
- String idsForSingleMap[] = (String[]) it.next();
- for (int j = 0; j < idsForSingleMap.length; j++) {
- if (idsForSingleMap[j].equals(loc.getMapTaskId())) {
- it.remove();
- foundit = true;
- break;
- }
+ for (Iterator it = needed.iterator(); it.hasNext(); ) {
+ int mapId = ((Integer) it.next()).intValue();
+ if (mapId == loc.getMapId()) {
+ it.remove();
+ break;
}
}
copyPhase.startNextPhase();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon May 15 12:13:33 2006
@@ -52,7 +52,7 @@
// Defines the TIP
private String jobFile = null;
private FileSplit split = null;
- private TaskInProgress predecessors[] = null;
+ private int numMaps;
private int partition;
private JobTracker jobtracker;
private String id;
@@ -95,11 +95,11 @@
* Constructor for ReduceTask
*/
public TaskInProgress(String uniqueString, String jobFile,
- TaskInProgress predecessors[],
+ int numMaps,
int partition, JobTracker jobtracker, JobConf conf,
JobInProgress job) {
this.jobFile = jobFile;
- this.predecessors = predecessors;
+ this.numMaps = numMaps;
this.partition = partition;
this.jobtracker = jobtracker;
this.job = job;
@@ -439,11 +439,8 @@
if (isMapTask()) {
t = new MapTask(jobFile, taskid, split);
} else {
- String mapIdPredecessors[][] = new String[predecessors.length][];
- for (int i = 0; i < mapIdPredecessors.length; i++) {
- mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds();
- }
- t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition);
+ t = new ReduceTask(job.getProfile().getJobId(), jobFile, taskid,
+ numMaps, partition);
}
t.setConf(conf);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon May 15 12:13:33 2006
@@ -730,9 +730,9 @@
// MapOutputProtocol
/////////////////////////////////////////////////////////////////
public MapOutputFile getFile(String mapTaskId, String reduceTaskId,
- IntWritable partition) {
- MapOutputFile mapOutputFile = new MapOutputFile(mapTaskId, reduceTaskId,
- partition.get());
+ int mapId, int partition) {
+ MapOutputFile mapOutputFile =
+ new MapOutputFile(mapTaskId, reduceTaskId, mapId, partition);
mapOutputFile.setConf(this.fConf);
return mapOutputFile;
}