You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/21 21:40:25 UTC

svn commit: r477876 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Nov 21 12:40:24 2006
New Revision: 477876

URL: http://svn.apache.org/viewvc?view=rev&rev=477876
Log:
HADOOP-76.  Implement speculative reduce.  Contributed by Sanjay.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 21 12:40:24 2006
@@ -116,6 +116,12 @@
     for sorting datanode list by various columns.
     (Raghu Angadi via cutting)
 
+35. HADOOP-76.  Implement speculative reduce.  Now when a job is
+    configured for speculative execution, both maps and reduces will
+    execute speculatively.  Reduce outputs are written to temporary
+    location and moved to the final location when reduce is complete.
+    (Sanjay Dahiya via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Nov 21 12:40:24 2006
@@ -154,12 +154,12 @@
       FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
                                       conf.getLong("map.input.start", 0),
                                       conf.getLong("map.input.length", 0));
-      task = new MapTask(jobId, jobFilename.toString(), taskId, partition, 
-                         split);
+      task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), 
+          taskId, partition, split);
     } else {
       int numMaps = conf.getNumMapTasks();
       fillInMissingMapOutputs(local, taskId, numMaps, conf);
-      task = new ReduceTask(jobId, jobFilename.toString(), taskId,
+      task = new ReduceTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), taskId, 
                             partition, numMaps);
     }
     task.setConf(conf);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Nov 21 12:40:24 2006
@@ -357,7 +357,7 @@
             return null;
         }
 
-        double avgProgress = status.reduceProgress() / reduces.length;
+        double avgProgress = status.reduceProgress() ;
         int target = findNewTask(tts, clusterSize, avgProgress, 
                                     reduces, null);
         if (target == -1) {
@@ -438,8 +438,10 @@
                 LOG.info("Choosing normal task " + tasks[i].getTIPId());
                 return i;
               } else if (specTarget == -1 &&
-                         task.hasSpeculativeTask(avgProgress)) {
+                         task.hasSpeculativeTask(avgProgress) && 
+                         ! task.hasRunOnMachine(taskTracker)) {
                 specTarget = i;
+                break ;
               }
             }
           }
@@ -691,6 +693,11 @@
         // so we remove that directory to cleanup
         FileSystem fs = FileSystem.get(conf);
         fs.delete(new Path(profile.getJobFile()).getParent());
+        
+        // Delete temp dfs dirs created if any, like in case of 
+        // speculative exn of reduces.  
+     //   String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; 
+     //   fs.delete(new Path(tempDir)); 
 
       } catch (IOException e) {
         LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Nov 21 12:40:24 2006
@@ -96,8 +96,10 @@
         // run a map task for each split
         job.setNumReduceTasks(1);                 // force a single reduce task
         for (int i = 0; i < splits.length; i++) {
-          mapIds.add("map_" + newId());
-          MapTask map = new MapTask(jobId, file, (String)mapIds.get(i), i,
+          String mapId = "map_" + newId() ; 
+          mapIds.add(mapId);
+          MapTask map = new MapTask(jobId, file, "tip_m_" + mapId, 
+                                    mapId, i,
                                     splits[i]);
           JobConf localConf = new JobConf(job);
           map.localizeConfiguration(localConf);
@@ -126,7 +128,7 @@
 
         {
           ReduceTask reduce = new ReduceTask(jobId, file, 
-                                             reduceId, 0, mapIds.size());
+                                             "tip_r_0001", reduceId, 0, mapIds.size());
           JobConf localConf = new JobConf(job);
           reduce.localizeConfiguration(localConf);
           reduce.setConf(localConf);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Nov 21 12:40:24 2006
@@ -85,9 +85,9 @@
 
   public MapTask() {}
 
-  public MapTask(String jobId, String jobFile, String taskId, 
+  public MapTask(String jobId, String jobFile, String tipId, String taskId, 
                  int partition, FileSplit split) {
-    super(jobId, jobFile, taskId, partition);
+    super(jobId, jobFile, tipId, taskId, partition);
     this.split = split;
     myMetrics = new MapTaskMetrics(taskId);
   }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=auto&rev=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Nov 21 12:40:24 2006
@@ -0,0 +1,464 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class acts as a proxy to the actual file system being used. 
+ * It writes files to a temporary location and on
+ * commit, moves to final location. On abort or a failure in 
+ * commit the temporary file is deleted  
+ * PhasedFileSystem works in context of a task. A different instance of 
+ * PhasedFileSystem should be used for every task.  
+ * Temporary files are written in  ("mapred.system.dir")/<jobid>/<taskid>
+ * If one tasks opens a large number of files in succession then its 
+ * better to commit(Path) individual files when done. Otherwise
+ * commit() can be used to commit all open files at once. 
+ */
+class PhasedFileSystem extends FileSystem {
+
+  private FileSystem baseFS ;
+  // Map from final file name to temporary file name
+  private Map<Path, FileInfo> finalNameToFileInfo = new HashMap(); 
+  
+  private String jobid ; 
+  private String tipid ; 
+  private String taskid ; 
+  
+  private Path tempDir ; 
+  /**
+   * This Constructor is used to wrap a FileSystem object to a 
+   * Phased FilsSystem.  
+   * @param fs base file system object
+   * @param jobid JobId
+   * @param tipid tipId 
+   * @param taskid taskId
+   */
+  public PhasedFileSystem(FileSystem fs, String jobid, 
+      String tipid, String taskid) {
+    super(fs.getConf()); // not used
+    
+    this.baseFS = fs ; 
+    this.jobid = jobid; 
+    this.tipid = tipid ; 
+    this.taskid = taskid ; 
+    
+    tempDir = new Path(baseFS.getConf().get("mapred.system.dir") ); 
+  }
+  /**
+   * This Constructor is used to wrap a FileSystem object to a 
+   * Phased FilsSystem.  
+   * @param fs base file system object
+   * @param conf JobConf
+   */
+  public PhasedFileSystem(FileSystem fs, JobConf conf) {
+    super(fs.getConf()); // not used
+    
+    this.baseFS = fs ; 
+    this.jobid = conf.get("mapred.job.id"); 
+    this.tipid = conf.get("mapred.tip.id"); 
+    this.taskid = conf.get("mapred.task.id") ; 
+    
+    tempDir = new Path(baseFS.getConf().get("mapred.system.dir") ); 
+  }
+  /**
+   * This Constructor should not be used in this or any derived class. 
+   * @param conf
+   */
+  protected PhasedFileSystem(Configuration conf){
+    super(conf);
+    throw new UnsupportedOperationException("Operation not supported"); 
+  }
+  
+  private Path setupFile(Path finalFile, boolean overwrite) throws IOException{
+    if( finalNameToFileInfo.containsKey(finalFile) ){
+      if( !overwrite ){
+        throw new IOException("Error, file already exists : " + 
+            finalFile.toString()); 
+      }else{
+        // delete tempp file and let create a new one. 
+        FileInfo fInfo = finalNameToFileInfo.get(finalFile); 
+        try{
+          fInfo.getOpenFileStream().close();
+        }catch(IOException ioe){
+          // ignore if already closed
+        }
+        baseFS.delete( fInfo.getTempPath() ); 
+        finalNameToFileInfo.remove(finalFile); 
+      }
+    }
+    
+    String uniqueFile = jobid + "/" + tipid + "/" + taskid + "/" + finalFile.getName();
+    
+    Path tempPath = new Path(tempDir, new Path(uniqueFile)); 
+    FileInfo fInfo = new FileInfo(tempPath, finalFile, overwrite); 
+    
+    finalNameToFileInfo.put(finalFile, fInfo);
+    
+    return tempPath ; 
+  }
+  
+  @Override
+  public FSOutputStream createRaw(
+      Path f, boolean overwrite, short replication, long blockSize)
+      throws IOException {
+    
+    // for reduce output its checked in job client but lets check it anyways
+    // as tasks with side effect may write to locations not set in jobconf
+    // as output path. 
+    if( baseFS.exists(f) && !overwrite ){
+      throw new IOException("Error creating file - already exists : " + f); 
+    }
+    FSOutputStream stream = 
+      baseFS.createRaw(setupFile(f, overwrite), overwrite, replication, blockSize); 
+    finalNameToFileInfo.get(f).setOpenFileStream(stream); 
+    return stream; 
+  }
+
+  @Override
+  public FSOutputStream createRaw(
+      Path f, boolean overwrite, short replication, long blockSize,
+      Progressable progress)
+      throws IOException {
+    if( baseFS.exists(f) && !overwrite ){
+      throw new IOException("Error creating file - already exists : " + f); 
+    }
+    FSOutputStream stream = 
+      baseFS.createRaw(setupFile(f, overwrite), overwrite, replication, 
+          blockSize, progress);
+    finalNameToFileInfo.get(f).setOpenFileStream(stream); 
+    return stream ; 
+  }
+  /**
+   * Commits a single file file to its final locations as passed in create* methods. 
+   * If a file already exists in final location then temporary file is deleted. 
+   * @param fPath path to final file. 
+   * @throws IOException thrown if commit fails
+   */
+  public void commit(Path fPath) throws IOException{
+    commit(fPath, true); 
+  }
+  
+  // use extra method arg to avoid concurrentModificationException 
+  // if committing using this method while iterating.  
+  private void commit(Path fPath , boolean removeFromMap)throws IOException{
+    FileInfo fInfo = finalNameToFileInfo.get(fPath) ; 
+    if( null == fInfo ){
+      throw new IOException("Error committing file! File was not created " + 
+          "with PhasedFileSystem : " + fPath); 
+    }
+    try{
+      fInfo.getOpenFileStream().close();
+    }catch(IOException ioe){
+      // ignore if already closed
+      ioe.printStackTrace();
+    }
+    Path tempPath = fInfo.getTempPath(); 
+    // ignore .crc files 
+    if(! tempPath.toString().endsWith(".crc")){
+      if( !baseFS.exists(fPath) || fInfo.isOverwrite()){
+        if(! baseFS.exists(fPath.getParent())){
+          baseFS.mkdirs(fPath.getParent());
+        }
+        
+        if( baseFS.exists(fPath) && fInfo.isOverwrite()){
+          baseFS.delete(fPath); 
+        }
+        
+        try {
+          if( ! baseFS.rename(fInfo.getTempPath(), fPath) ){
+            // delete the temp file if rename failed
+            baseFS.delete(fInfo.getTempPath());
+          }
+        }catch(IOException ioe){
+          // rename failed, log error and delete temp files
+          LOG.error("PhasedFileSystem failed to commit file : " + fPath 
+              + " error : " + ioe.getMessage()); 
+          baseFS.delete(fInfo.getTempPath());
+        }
+      }else{
+        // delete temp file
+        baseFS.delete(fInfo.getTempPath());
+      }
+      // done with the file
+      if( removeFromMap ){
+        finalNameToFileInfo.remove(fPath);
+      }
+    }
+  }
+
+  /**
+   * Commits files to their final locations as passed in create* methods. 
+   * If a file already exists in final location then temporary file is deleted. 
+   * This methods ignores crc files (ending with .crc). This method doesnt close
+   * the file system so it can still be used to create new files. 
+   * @throws IOException if any file fails to commit
+   */
+  public void commit() throws IOException {
+    for( Path fPath : finalNameToFileInfo.keySet()){
+      commit(fPath, false);  
+    }
+    // safe to clear map now
+    finalNameToFileInfo.clear();
+  } 
+  /**
+   * Aborts a single file. The temporary created file is deleted. 
+   * @param p the path to final file as passed in create* call
+   * @throws IOException if File delete fails  
+   */
+  public void abort(Path p)throws IOException{
+    abort(p, true); 
+  }
+  
+  // use extra method arg to avoid concurrentModificationException 
+  // if aborting using this method while iterating.  
+  private void abort(Path p, boolean removeFromMap) throws IOException{
+    FileInfo fInfo = finalNameToFileInfo.get(p); 
+    if( null != fInfo ){
+      try{
+        fInfo.getOpenFileStream().close();
+      }catch(IOException ioe){
+        // ignore if already closed
+      }
+      baseFS.delete(fInfo.getTempPath()); 
+      if( removeFromMap ){
+        finalNameToFileInfo.remove(p);
+      }
+    }
+  }
+  /**
+   * Aborts the file creation, all uncommitted files created by this PhasedFileSystem 
+   * instance are deleted. This does not close baseFS because handle to baseFS may still 
+   * exist can be used to create new files. 
+   * 
+   * @throws IOException
+   */
+  public void abort() throws IOException {
+    for(Path fPath : finalNameToFileInfo.keySet() ){
+      abort(fPath, false); 
+    }
+    // safe to clean now
+    finalNameToFileInfo.clear();
+  }
+  /**
+   * Closes base file system. 
+   */
+  public void close() throws IOException { 
+    baseFS.close(); 
+  } 
+  
+  @Override
+  public short getReplication(
+      Path src)
+      throws IOException {
+    // keep replication same for temp file as for 
+    // final file. 
+    return baseFS.getReplication(src);
+  }
+
+  @Override
+  public boolean setReplicationRaw(
+      Path src, short replication)
+      throws IOException {
+    // throw IOException for interface compatibility with 
+    // base class. 
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public boolean renameRaw(
+      Path src, Path dst)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public boolean deleteRaw(
+      Path f)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public boolean exists(Path f)
+      throws IOException {
+    return baseFS.exists(f);
+  }
+
+  @Override
+  public boolean isDirectory(Path f)
+      throws IOException {
+    return baseFS.isDirectory(f);  
+  }
+
+  @Override
+  public long getLength(Path f)
+      throws IOException {
+    return baseFS.getLength(f); 
+  }
+
+  @Override
+  public Path[] listPathsRaw(Path f)
+      throws IOException {
+    return baseFS.listPathsRaw(f);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path new_dir) {
+    baseFS.setWorkingDirectory(new_dir);   
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return baseFS.getWorkingDirectory();  
+  }
+
+  @Override
+  public boolean mkdirs(Path f)
+      throws IOException {
+    return baseFS.mkdirs(f) ;
+  }
+
+  @Override
+  public void lock(
+      Path f, boolean shared)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public void release(
+      Path f)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public void copyFromLocalFile(
+      Path src, Path dst)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public void moveFromLocalFile(
+      Path src, Path dst)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public void copyToLocalFile(
+      Path src, Path dst)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public Path startLocalOutput(
+      Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+ }
+
+  @Override
+  public void completeLocalOutput(
+      Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+ }
+
+  @Override
+  public void reportChecksumFailure(
+      Path f, FSInputStream in, long start, long length, int crc) {
+    baseFS.reportChecksumFailure(f, in, start, length, crc); 
+  }
+
+  @Override
+  public long getBlockSize(
+      Path f)
+      throws IOException {
+    return baseFS.getBlockSize(f);
+  }
+
+  @Override
+  public long getDefaultBlockSize() {
+    return baseFS.getDefaultBlockSize();
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return baseFS.getDefaultReplication();
+  }
+
+  @Override
+  public String[][] getFileCacheHints(
+      Path f, long start, long len)
+      throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public String getName() {
+    throw new UnsupportedOperationException("Operation not supported");  
+  }
+
+  @Override
+  public FSInputStream openRaw(Path f)
+      throws IOException {
+    return baseFS.openRaw(f);   
+  }
+  
+  private class FileInfo {
+    private Path tempPath ;
+    private Path finalPath ; 
+    private FSOutputStream openFileStream ; 
+    private boolean overwrite ;
+    
+    FileInfo(Path tempPath, Path finalPath, boolean overwrite){
+      this.tempPath = tempPath ; 
+      this.finalPath = finalPath ; 
+      this.overwrite = overwrite; 
+    }
+    public FSOutputStream getOpenFileStream() {
+      return openFileStream;
+    }
+    public void setOpenFileStream(
+        FSOutputStream openFileStream) {
+      this.openFileStream = openFileStream;
+    }
+    public Path getFinalPath() {
+      return finalPath;
+    }
+    public void setFinalPath(
+        Path finalPath) {
+      this.finalPath = finalPath;
+    }
+    public boolean isOverwrite() {
+      return overwrite;
+    }
+    public void setOverwrite(
+        boolean overwrite) {
+      this.overwrite = overwrite;
+    }
+    public Path getTempPath() {
+      return tempPath;
+    }
+    public void setTempPath(
+        Path tempPath) {
+      this.tempPath = tempPath;
+    }
+    
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 21 12:40:24 2006
@@ -80,9 +80,9 @@
 
   public ReduceTask() {}
 
-  public ReduceTask(String jobId, String jobFile, String taskId,
+  public ReduceTask(String jobId, String jobFile, String tipId, String taskId,
                     int partition, int numMaps) {
-    super(jobId, jobFile, taskId, partition);
+    super(jobId, jobFile, tipId, taskId, partition);
     this.numMaps = numMaps;
     myMetrics = new ReduceTaskMetrics(taskId);
   }
@@ -272,9 +272,18 @@
     Reporter reporter = getReporter(umbilical, getProgress());
     
     // make output collector
-    String name = getOutputName(getPartition());
-    final RecordWriter out =
-      job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name, reporter);
+    String finalName = getOutputName(getPartition());
+    boolean runSpeculative = job.getSpeculativeExecution();
+    FileSystem fs = FileSystem.get(job) ;
+
+    if( runSpeculative ){
+        fs = new PhasedFileSystem (fs , 
+                      getJobId(), getTipId(), getTaskId());
+    }
+    
+    final RecordWriter out = 
+      job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter) ;  
+    
     OutputCollector collector = new OutputCollector() {
         public void collect(WritableComparable key, Writable value)
           throws IOException {
@@ -299,6 +308,9 @@
     } finally {
       reducer.close();
       out.close(reporter);
+      if( runSpeculative ){
+        ((PhasedFileSystem)fs).commit(); 
+       }
     }
     done(umbilical);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Nov 21 12:40:24 2006
@@ -38,6 +38,7 @@
   private String jobFile;                         // job configuration file
   private String taskId;                          // unique, includes job id
   private String jobId;                           // unique jobid
+  private String tipId ;
   private int partition;                          // id within job
   private TaskStatus.Phase phase ;                         // current phase of the task 
 
@@ -47,10 +48,12 @@
 
   public Task() {}
 
-  public Task(String jobId, String jobFile, String taskId, int partition) {
+  public Task(String jobId, String jobFile, String tipId, 
+      String taskId, int partition) {
     this.jobFile = jobFile;
     this.taskId = taskId;
     this.jobId = jobId;
+    this.tipId = tipId; 
     this.partition = partition;
   }
 
@@ -60,6 +63,7 @@
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
   public String getTaskId() { return taskId; }
+  public String getTipId(){ return tipId ; }
   
   /**
    * Get the job name for this task.
@@ -97,12 +101,14 @@
 
   public void write(DataOutput out) throws IOException {
     UTF8.writeString(out, jobFile);
+    UTF8.writeString(out, tipId); 
     UTF8.writeString(out, taskId);
     UTF8.writeString(out, jobId);
     out.writeInt(partition);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = UTF8.readString(in);
+    tipId = UTF8.readString(in);
     taskId = UTF8.readString(in);
     jobId = UTF8.readString(in);
     partition = in.readInt();
@@ -114,6 +120,7 @@
    * Localize the given JobConf to be specific for this task.
    */
   public void localizeConfiguration(JobConf conf) {
+    conf.set("mapred.tip.id", tipId); 
     conf.set("mapred.task.id", taskId);
     conf.setBoolean("mapred.task.is.map",isMapTask());
     conf.setInt("mapred.task.partition", partition);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Nov 21 12:40:24 2006
@@ -18,10 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.*;
-import org.apache.hadoop.util.*;
 
 import java.text.NumberFormat;
-import java.io.*;
 import java.util.*;
 
 
@@ -44,6 +42,7 @@
     static final int MAX_TASK_FAILURES = 4;    
     static final double SPECULATIVE_GAP = 0.2;
     static final long SPECULATIVE_LAG = 60 * 1000;
+    static final int MAX_CONCURRENT_TASKS = 2; 
     private static NumberFormat idFormat = NumberFormat.getInstance();
     static {
       idFormat.setMinimumIntegerDigits(6);
@@ -73,7 +72,9 @@
     private boolean failed = false;
     private boolean killed = false;
     private TreeSet usableTaskIds = new TreeSet();
-    private TreeSet recentTasks = new TreeSet();
+    // Map from task Id -> TaskTracker Id, contains tasks that are
+    // currently runnings
+    private TreeMap<String, String> activeTasks = new TreeMap();
     private JobConf conf;
     private boolean runSpeculative;
     private Map<String,List<String>> taskDiagnosticData = new TreeMap();
@@ -176,7 +177,7 @@
      * @return true if any tasks are running
      */
     public boolean isRunning() {
-      return !recentTasks.isEmpty();
+      return !activeTasks.isEmpty();
     }
     
     /**
@@ -326,8 +327,8 @@
               status.setFinishTime(System.currentTimeMillis());
             }
         }
-        this.recentTasks.remove(taskid);
-        if (this.completes > 0) {
+        this.activeTasks.remove(taskid);
+        if (this.completes > 0 && this.isMapTask()) {
             this.completes--;
         }
 
@@ -347,7 +348,7 @@
         LOG.info("Task '" + taskid + "' has completed.");
         TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
         status.setRunState(TaskStatus.State.SUCCEEDED);
-        recentTasks.remove(taskid);
+        activeTasks.remove(taskid);
 
         //
         // Now that the TIP is complete, the other speculative 
@@ -445,11 +446,19 @@
         // in more depth eventually...
         //
         if (isMapTask() &&
-            recentTasks.size() <= MAX_TASK_EXECS &&
+            activeTasks.size() <= MAX_TASK_EXECS &&
             runSpeculative &&
             (averageProgress - progress >= SPECULATIVE_GAP) &&
             (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
             return true;
+        }else{
+          //Note: validate criteria for speculative reduce execution
+          if( runSpeculative && (activeTasks.size() < MAX_CONCURRENT_TASKS ) && 
+              (averageProgress - progress >= SPECULATIVE_GAP) &&
+              completes <= 0 &&
+              (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
+            return true ; 
+          }
         }
         return false;
     }
@@ -469,13 +478,13 @@
         String jobId = job.getProfile().getJobId();
 
         if (isMapTask()) {
-          t = new MapTask(jobId, jobFile, taskid, partition, split);
+          t = new MapTask(jobId, jobFile, this.id, taskid, partition, split);
         } else {
-          t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
+          t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
         }
         t.setConf(conf);
 
-        recentTasks.add(taskid);
+        activeTasks.put(taskid, taskTracker);
 
         // Ask JobTracker to note that the task exists
         jobtracker.createTaskEntry(taskid, taskTracker, this);
@@ -491,6 +500,15 @@
       return machinesWhereFailed.contains(tracker);
     }
     
+    /**
+     * Was this task ever scheduled to run on this machine?
+     * @param tracker The task tracker name
+     * @return Was task scheduled on the tracker?
+     */
+    public boolean hasRunOnMachine(String tracker){
+      return this.activeTasks.values().contains(tracker) || 
+               hasFailedOnMachine(tracker) ;
+    }
     /**
      * Get the number of machines where this task has failed.
      * @return the size of the failed machine set

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=477876&r1=477875&r2=477876
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 21 12:40:24 2006
@@ -1044,12 +1044,33 @@
                   failures += 1;
                 }
                 runner.kill();
+                runstate = TaskStatus.State.KILLED;
             } else if (runstate == TaskStatus.State.UNASSIGNED) {
               if (wasFailure) {
                 failures += 1;
                 runstate = TaskStatus.State.FAILED;
               } else {
                 runstate = TaskStatus.State.KILLED;
+              }
+            }
+            
+            // the temporary file names in speculative exn are generated in 
+            // the launched JVM, and we dont talk to it when killing so cleanup
+            // should happen here. find the task id and delete the temp directory 
+            // for the task. only for killed speculative reduce instances
+            
+            // Note: it would be better to couple this with delete localfiles
+            // which is in conf currently, it doenst belong there. 
+
+            if( !task.isMapTask() && 
+                this.defaultJobConf.getSpeculativeExecution() ){
+              try{
+                String systemDir = task.getConf().get("mapred.system.dir");
+                String taskTempDir = systemDir + "/" + 
+                    task.getJobId() + "/" + task.getTipId();
+                fs.delete(new Path(taskTempDir)) ;
+              }catch(IOException e){
+                LOG.warn("Error in deleting reduce temporary output",e); 
               }
             }
         }