You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/15 21:13:34 UTC

svn commit: r406718 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Mon May 15 12:13:33 2006
New Revision: 406718

URL: http://svn.apache.org/viewcvs?rev=406718&view=rev
Log:
HADOOP-200.  Avoid transmitting entire list of map task names to reduce tasks.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 15 12:13:33 2006
@@ -21,6 +21,11 @@
     and miss his heartbeat window. This was killing many task trackers as
     big jobs finished (300+ tasks / node). (omalley via cutting)
 
+ 6. HADOOP-200. Avoid transmitting entire list of map task names to
+    reduce tasks.  Instead just transmit the number of map tasks and
+    henceforth refer to them by number when collecting map output.
+    (omalley via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Mon May 15 12:13:33 2006
@@ -48,12 +48,14 @@
 
   /** Called by a reduce task to find which map tasks are completed.
    *
-   * @param taskId the reduce task id
-   * @param mapTasksNeeded an array of UTF8 naming map task ids whose output is needed.
+   * @param jobId the job id
+   * @param mapTasksNeeded an array of the mapIds that we need
+   * @param partition the reduce's id
    * @return an array of MapOutputLocation
    */
-  MapOutputLocation[] locateMapOutputs(String taskId, 
-                                       String[][] mapTasksNeeded
+  MapOutputLocation[] locateMapOutputs(String jobId, 
+                                       int[] mapTasksNeeded,
+                                       int partition
                                        ) throws IOException;
 
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon May 15 12:13:33 2006
@@ -151,7 +151,8 @@
         //
         this.reduces = new TaskInProgress[numReduceTasks];
         for (int i = 0; i < numReduceTasks; i++) {
-            reduces[i] = new TaskInProgress(uniqueString, jobFile, maps, i, 
+            reduces[i] = new TaskInProgress(uniqueString, jobFile, 
+                                            numMapTasks, i, 
                                             jobtracker, conf, this);
         }
 
@@ -582,16 +583,34 @@
       * Return the TaskInProgress that matches the tipid.
       */
     public TaskInProgress getTaskInProgress(String tipid){
-        for (int i = 0; i < maps.length; i++) {
-	    if (tipid.equals(maps[i].getTIPId())){
-                return maps[i];
-	    }               
-	}
-	for (int i = 0; i < reduces.length; i++) {
-	    if (tipid.equals(reduces[i].getTIPId())){
-		return reduces[i];
-            }
-	}
-	return null;
+      for (int i = 0; i < maps.length; i++) {
+        if (tipid.equals(maps[i].getTIPId())){
+          return maps[i];
+        }               
+      }
+      for (int i = 0; i < reduces.length; i++) {
+        if (tipid.equals(reduces[i].getTIPId())){
+          return reduces[i];
+        }
+      }
+      return null;
+    }
+    
+    /**
+     * Find the details of someplace where a map has finished
+     * @param mapId the id of the map
+     * @return the task status of the completed task
+     */
+    public TaskStatus findFinishedMap(int mapId) {
+       TaskInProgress tip = maps[mapId];
+       if (tip.isComplete()) {
+         TaskStatus[] statuses = tip.getTaskStatuses();
+         for(int i=0; i < statuses.length; i++) {
+           if (statuses[i].getRunState() == TaskStatus.SUCCEEDED) {
+             return statuses[i];
+           }
+         }
+       }
+       return null;
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon May 15 12:13:33 2006
@@ -783,26 +783,27 @@
      * yet closed, tasks.  This exists so the reduce task thread can locate
      * map task outputs.
      */
-    public synchronized MapOutputLocation[] locateMapOutputs(String taskId, String[][] mapTasksNeeded) {
-        ArrayList v = new ArrayList();
+    public synchronized MapOutputLocation[] 
+             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {
+        ArrayList result = new ArrayList(mapTasksNeeded.length);
+        JobInProgress job = getJob(jobId);
         for (int i = 0; i < mapTasksNeeded.length; i++) {
-            for (int j = 0; j < mapTasksNeeded[i].length; j++) {
-                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(mapTasksNeeded[i][j]);
-                if (tip != null && tip.isComplete(mapTasksNeeded[i][j])) {
-                    String trackerId = (String) taskidToTrackerMap.get(mapTasksNeeded[i][j]);
-                    TaskTrackerStatus tracker;
-                    synchronized (taskTrackers) {
-                      tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
-                    }
-                    v.add(new MapOutputLocation(mapTasksNeeded[i][j], tracker.getHost(), tracker.getPort()));
-                    break;
-                }
-            }
+          TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
+          if (status != null) {
+             String trackerId = 
+               (String) taskidToTrackerMap.get(status.getTaskId());
+             TaskTrackerStatus tracker;
+             synchronized (taskTrackers) {
+               tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
+             }
+             result.add(new MapOutputLocation(status.getTaskId(), 
+                                              mapTasksNeeded[i],
+                                              tracker.getHost(), 
+                                              tracker.getPort()));
+          }
         }
-        // randomly shuffle results to load-balance map output requests
-        Collections.shuffle(v);
-
-        return (MapOutputLocation[]) v.toArray(new MapOutputLocation[v.size()]);
+        return (MapOutputLocation[]) 
+               result.toArray(new MapOutputLocation[result.size()]);
     }
 
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon May 15 12:13:33 2006
@@ -96,7 +96,7 @@
         for (int i = 0; i < mapIds.size(); i++) {
           String mapId = (String)mapIds.get(i);
           Path mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
-          Path reduceIn = this.mapoutputFile.getInputFile(mapId, reduceId);
+          Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
           localFs.mkdirs(reduceIn.getParent());
           if (!localFs.rename(mapOut, reduceIn))
             throw new IOException("Couldn't rename " + mapOut);
@@ -104,11 +104,8 @@
         }
 
         // run a single reduce task
-        String mapDependencies[][] = new String[mapIds.size()][1];
-        for (int i = 0; i < mapIds.size(); i++) {
-            mapDependencies[i][0] = (String) mapIds.get(i);
-        }
-        ReduceTask reduce = new ReduceTask(file, reduceId, mapDependencies,0);
+        ReduceTask reduce = new ReduceTask(profile.getJobId(), file, 
+                                           reduceId, mapIds.size(),0);
         reduce.setConf(job);
         reduce_tasks += 1;
         reduce.run(job, this);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon May 15 12:13:33 2006
@@ -39,6 +39,7 @@
 
   private String mapTaskId;
   private String reduceTaskId;
+  private int mapId;
   private int partition;
   
   /** Permits reporting of file copy progress. */
@@ -66,18 +67,10 @@
    * @param mapTaskId a map task id
    * @param reduceTaskId a reduce task id
    */
-  public Path getInputFile(String mapTaskId, String reduceTaskId)
+  public Path getInputFile(int mapId, String reduceTaskId)
     throws IOException {
-    return this.jobConf.getLocalPath(reduceTaskId+"/"+mapTaskId+".out");
-  }
-  public Path getInputFile(String mapTaskIds[], String reduceTaskId)
-    throws IOException {
-    for (int i = 0; i < mapTaskIds.length; i++) {
-      Path file = jobConf.getLocalPath(reduceTaskId+"/"+mapTaskIds[i]+".out");
-      if (getLocalFs().exists(file))
-        return file;
-    }
-    throw new IOException("Input file not found!");
+    // TODO *oom* should use a format here
+    return this.jobConf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
   }
 
   /** Removes all of the files related to a task. */
@@ -97,9 +90,11 @@
   public MapOutputFile() { 
   }
   
-  public MapOutputFile(String mapTaskId, String reduceTaskId, int partition) {
+  public MapOutputFile(String mapTaskId, String reduceTaskId, 
+                       int mapId, int partition) {
     this.mapTaskId = mapTaskId;
     this.reduceTaskId = reduceTaskId;
+    this.mapId = mapId;
     this.partition = partition;
   }
 
@@ -110,6 +105,7 @@
   public void write(DataOutput out) throws IOException {
     UTF8.writeString(out, mapTaskId);
     UTF8.writeString(out, reduceTaskId);
+    out.writeInt(mapId);
     out.writeInt(partition);
     
     Path file = getOutputFile(mapTaskId, partition);
@@ -145,12 +141,13 @@
   public void readFields(DataInput in) throws IOException {
     this.mapTaskId = UTF8.readString(in);
     this.reduceTaskId = UTF8.readString(in);
+    this.mapId = in.readInt();
     this.partition = in.readInt();
 
     ProgressReporter reporter = (ProgressReporter)REPORTERS.get();
 
     // read the length-prefixed file content into a local file
-    Path file = getInputFile(mapTaskId, reduceTaskId);
+    Path file = getInputFile(mapId, reduceTaskId);
     long length = in.readLong();
     float progPerByte = 1.0f / length;
     long unread = length;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Mon May 15 12:13:33 2006
@@ -34,6 +34,7 @@
     }
 
   private String mapTaskId;
+  private int mapId;
   private String host;
   private int port;
 
@@ -42,14 +43,24 @@
   }
 
   /** Construct a location. */
-  public MapOutputLocation(String mapTaskId, String host, int port) {
+  public MapOutputLocation(String mapTaskId, int mapId, 
+                           String host, int port) {
     this.mapTaskId = mapTaskId;
+    this.mapId = mapId;
     this.host = host;
     this.port = port;
   }
 
   /** The map task id. */
   public String getMapTaskId() { return mapTaskId; }
+  
+  /**
+   * Get the map's id number.
+   * @return The numeric id for this map
+   */
+  public int getMapId() {
+    return mapId;
+  }
 
   /** The host the task completed on. */
   public String getHost() { return host; }
@@ -59,12 +70,14 @@
 
   public void write(DataOutput out) throws IOException {
     UTF8.writeString(out, mapTaskId);
+    out.writeInt(mapId);
     UTF8.writeString(out, host);
     out.writeInt(port);
   }
 
   public void readFields(DataInput in) throws IOException {
     this.mapTaskId = UTF8.readString(in);
+    this.mapId = in.readInt();
     this.host = UTF8.readString(in);
     this.port = in.readInt();
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java Mon May 15 12:13:33 2006
@@ -26,6 +26,6 @@
 
   /** Returns the output from the named map task destined for this partition.*/
   MapOutputFile getFile(String mapTaskId, String reduceTaskId,
-                        IntWritable partition) throws IOException;
+                        int mapId, int partition) throws IOException;
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May 15 12:13:33 2006
@@ -36,7 +36,8 @@
        });
   }
 
-  private String[][] mapTaskIds;
+  private UTF8 jobId = new UTF8();
+  private int numMaps;
   private int partition;
   private boolean sortComplete;
 
@@ -51,10 +52,11 @@
 
   public ReduceTask() {}
 
-  public ReduceTask(String jobFile, String taskId,
-                    String[][] mapTaskIds, int partition) {
+  public ReduceTask(String jobId, String jobFile, String taskId,
+                    int numMaps, int partition) {
     super(jobFile, taskId);
-    this.mapTaskIds = mapTaskIds;
+    this.jobId.set(jobId);
+    this.numMaps = numMaps;
     this.partition = partition;
   }
 
@@ -66,34 +68,30 @@
       return false;
   }
 
-  public String[][] getMapTaskIds() { return mapTaskIds; }
+  /**
+   * Get the job name for this task.
+   * @return the job name
+   */
+  public UTF8 getJobId() {
+    return jobId;
+  }
+  
+  public int getNumMaps() { return numMaps; }
   public int getPartition() { return partition; }
 
   public void write(DataOutput out) throws IOException {
     super.write(out);
 
-    out.writeInt(mapTaskIds.length);              // write mapTaskIds
-    for (int i = 0; i < mapTaskIds.length; i++) {
-        out.writeInt(mapTaskIds[i].length);
-        for (int j = 0; j < mapTaskIds[i].length; j++) {
-            UTF8.writeString(out, mapTaskIds[i][j]);
-        }
-    }
-
+    jobId.write(out);
+    out.writeInt(numMaps);                        // write the number of maps
     out.writeInt(partition);                      // write partition
   }
 
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
 
-    mapTaskIds = new String[in.readInt()][];        // read mapTaskIds
-    for (int i = 0; i < mapTaskIds.length; i++) {
-        mapTaskIds[i] = new String[in.readInt()];
-        for (int j = 0; j < mapTaskIds[i].length; j++) {
-            mapTaskIds[i][j] = UTF8.readString(in);
-        }
-    }
-
+    jobId.readFields(in);
+    numMaps = in.readInt();
     this.partition = in.readInt();                // read partition
   }
 
@@ -189,15 +187,15 @@
       new SequenceFile.Writer(lfs, file, keyClass, valueClass);
     try {
       // append all input files into a single input file
-      for (int i = 0; i < mapTaskIds.length; i++) {
+      for (int i = 0; i < numMaps; i++) {
         appendPhase.addPhase();                 // one per file
       }
       
       DataOutputBuffer buffer = new DataOutputBuffer();
 
-      for (int i = 0; i < mapTaskIds.length; i++) {
+      for (int i = 0; i < numMaps; i++) {
         Path partFile =
-          this.mapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
+          this.mapOutputFile.getInputFile(i, getTaskId());
         float progPerByte = 1.0f / lfs.getLength(partFile);
         Progress phase = appendPhase.phase();
         phase.setStatus(partFile.toString());

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon May 15 12:13:33 2006
@@ -15,9 +15,7 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 
 import java.io.*;
@@ -27,8 +25,6 @@
 
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
-  private static final Logger LOG =
-    LogFormatter.getLogger("org.apache.hadoop.mapred.ReduceTaskRunner");
   private MapOutputFile mapOutputFile;
 
   public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
@@ -41,13 +37,13 @@
   public boolean prepare() throws IOException {
     ReduceTask task = ((ReduceTask)getTask());
     this.mapOutputFile.removeAll(task.getTaskId());    // cleanup from failures
-    String[][] mapTaskIds = task.getMapTaskIds();
+    int numMaps = task.getNumMaps();
     final Progress copyPhase = getTask().getProgress().phase();
 
     // we need input from every map task
-    Vector needed = new Vector();
-    for (int i = 0; i < mapTaskIds.length; i++) {
-      needed.add(mapTaskIds[i]);
+    List needed = new ArrayList(numMaps);
+    for (int i = 0; i < numMaps; i++) {
+      needed.add(new Integer(i));
       copyPhase.addPhase();                       // add sub-phase per file
     }
 
@@ -59,15 +55,17 @@
       // query for a just a random subset of needed segments so that we don't
       // overwhelm jobtracker.  ideally perhaps we could send a more compact
       // representation of all needed, i.e., a bit-vector
-      Collections.shuffle(needed);
       int checkSize = Math.min(10, needed.size());
-      String[][] neededStrings = new String[checkSize][];
+      int[] neededIds = new int[checkSize];
+      Collections.shuffle(needed);
+      ListIterator itr = needed.listIterator();
       for (int i = 0; i < checkSize; i++) {
-          neededStrings[i] = (String[]) needed.elementAt(i);
+        neededIds[i] = ((Integer) itr.next()).intValue();
       }
       MapOutputLocation[] locs = null;
       try {
-        locs = jobClient.locateMapOutputs(task.getTaskId(), neededStrings);
+        locs = jobClient.locateMapOutputs(task.getJobId().toString(), 
+                                          neededIds, task.getPartition());
       } catch (IOException ie) {
         LOG.info("Problem locating map outputs: " + 
                  StringUtils.stringifyException(ie));
@@ -112,18 +110,15 @@
           LOG.info(task.getTaskId()+" Copying "+loc.getMapTaskId()
                    +" output from "+loc.getHost()+".");
           client.getFile(loc.getMapTaskId(), task.getTaskId(),
-                         new IntWritable(task.getPartition()));
+                         loc.getMapId(),
+                         task.getPartition());
 
           // Success: remove from 'needed'
-          boolean foundit = false;
-          for (Iterator it = needed.iterator(); it.hasNext() && !foundit; ) {
-              String idsForSingleMap[] = (String[]) it.next();
-              for (int j = 0; j < idsForSingleMap.length; j++) {
-                  if (idsForSingleMap[j].equals(loc.getMapTaskId())) {
-                      it.remove();
-                      foundit = true;
-                      break;
-                  }
+          for (Iterator it = needed.iterator(); it.hasNext(); ) {
+              int mapId = ((Integer) it.next()).intValue();
+              if (mapId == loc.getMapId()) {
+                it.remove();
+                break;
               }
           }
           copyPhase.startNextPhase();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon May 15 12:13:33 2006
@@ -52,7 +52,7 @@
     // Defines the TIP
     private String jobFile = null;
     private FileSplit split = null;
-    private TaskInProgress predecessors[] = null;
+    private int numMaps;
     private int partition;
     private JobTracker jobtracker;
     private String id;
@@ -95,11 +95,11 @@
      * Constructor for ReduceTask
      */
     public TaskInProgress(String uniqueString, String jobFile, 
-                          TaskInProgress predecessors[], 
+                          int numMaps, 
                           int partition, JobTracker jobtracker, JobConf conf,
                           JobInProgress job) {
         this.jobFile = jobFile;
-        this.predecessors = predecessors;
+        this.numMaps = numMaps;
         this.partition = partition;
         this.jobtracker = jobtracker;
         this.job = job;
@@ -439,11 +439,8 @@
             if (isMapTask()) {
                 t = new MapTask(jobFile, taskid, split);
             } else {
-                String mapIdPredecessors[][] = new String[predecessors.length][];
-                for (int i = 0; i < mapIdPredecessors.length; i++) {
-                    mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds();
-                }
-                t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition);
+                t = new ReduceTask(job.getProfile().getJobId(), jobFile, taskid, 
+                                   numMaps, partition);
             }
             t.setConf(conf);
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=406718&r1=406717&r2=406718&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon May 15 12:13:33 2006
@@ -730,9 +730,9 @@
     // MapOutputProtocol
     /////////////////////////////////////////////////////////////////
     public MapOutputFile getFile(String mapTaskId, String reduceTaskId,
-      IntWritable partition) {
-    MapOutputFile mapOutputFile = new MapOutputFile(mapTaskId, reduceTaskId,
-        partition.get());
+                                 int mapId, int partition) {
+    MapOutputFile mapOutputFile = 
+      new MapOutputFile(mapTaskId, reduceTaskId, mapId, partition);
     mapOutputFile.setConf(this.fConf);
     return mapOutputFile;
   }