You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/26 21:43:35 UTC

svn commit: r409723 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/webapps/task/

Author: cutting
Date: Fri May 26 12:43:32 2006
New Revision: 409723

URL: http://svn.apache.org/viewvc?rev=409723&view=rev
Log:
HADOOP-254.  Use HTTP to transfer map output data to reduce nodes.  Contributed by Owen.

Added:
    lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
Removed:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri May 26 12:43:32 2006
@@ -69,6 +69,10 @@
     LocalFileSystem to include the name of the file.
     (Benjamin Reed via cutting)
 
+19. HADOOP-254.  Use HTTP to transfer map output data to reduce
+    nodes.  This, together with HADOOP-195, greatly improves the
+    performance of these transfers.  (omalley via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri May 26 12:43:32 2006
@@ -302,6 +302,20 @@
   </description>
 </property>
 
+<property>
+  <name>tasktracker.http.threads</name>
+  <value>40</value>
+  <description>The number of worker threads that for the http server. This is
+               used for map output fetching
+  </description>
+</property>
+
+<property>
+  <name>tasktracker.http.port</name>
+  <value>50060</value>
+  <description>The default port for task trackers to use as their http server.
+  </description>
+</property>
 
 <!-- ipc properties -->
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Fri May 26 12:43:32 2006
@@ -23,7 +23,6 @@
 import java.lang.reflect.InvocationTargetException;
 
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.util.logging.*;
 import java.io.*;
 
@@ -186,65 +185,6 @@
     
     return values;
   }
-
-  
-  /** Expert: Make an RPC call over the specified socket. Assumes that no other calls 
-   * are in flight on this connection. */ 
-  public static Object callRaw(Method method, Object[] params,
-                               Socket sock, Configuration conf)
-    throws IOException {  
-    
-    Invocation inv = new Invocation(method, params);    
-    DataInputStream in =
-      new DataInputStream(new BufferedInputStream(sock.getInputStream()));              
-    DataOutputStream out = 
-      new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));    
-    String name = new String("Client connection to " +
-                             sock.getInetAddress().getHostName() +
-                             ":" + sock.getPort());
-    
-    try {
-      if (LOG.isLoggable(Level.FINE)) {
-        LOG.fine(name + " sending #0");
-      }
- 
-      // write out method invocation
-      out.writeInt(0);
-      inv.write(out);
-      out.flush();
-      
-      // read return value
-      int callId = in.readInt();
-      
-      if (LOG.isLoggable(Level.FINE)) {
-        LOG.fine(name + " got response to call #" + callId);
-      }
-      
-      boolean isError = in.readBoolean();
-      if (isError) {
-        throw new RemoteException(WritableUtils.readString(in),
-                                  WritableUtils.readString(in));
-      }
-      else {
-
-        Writable wrappedValue = (Writable)ObjectWritable.class.newInstance();        
-        if (wrappedValue instanceof Configurable) {
-          ((Configurable) wrappedValue).setConf(conf);
-        }
-        wrappedValue.readFields(in);
-
-        return method.getReturnType() != Void.TYPE ?
-          ((ObjectWritable)wrappedValue).get() : null;
-      }
-    }
-    catch (InstantiationException e) {
-      throw new IOException(e.toString());
-    }
-    catch (IllegalAccessException e) {
-      throw new IOException(e.toString());
-    }
-  }
-  
 
   /** Construct a server for a protocol implementation instance listening on a
    * port. */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri May 26 12:43:32 2006
@@ -434,7 +434,7 @@
         }
 
         this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
-        this.infoServer = new StatusHttpServer("job", infoPort);
+        this.infoServer = new StatusHttpServer("job", infoPort, false);
         this.infoServer.start();
 
         this.startTime = System.currentTimeMillis();
@@ -821,7 +821,7 @@
              result.add(new MapOutputLocation(status.getTaskId(), 
                                               mapTasksNeeded[i],
                                               tracker.getHost(), 
-                                              tracker.getPort()));
+                                              tracker.getHttpPort()));
           }
         }
         return (MapOutputLocation[]) 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Fri May 26 12:43:32 2006
@@ -17,51 +17,24 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.logging.Level;
 
-import java.io.*;
-import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.mapred.TaskTracker.MapOutputServer;
 
-/** A local file to be transferred via the {@link MapOutputProtocol}. */ 
-class MapOutputFile implements Writable, Configurable {
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ */ 
+class MapOutputFile {
 
-    static {                                      // register a ctor
-      WritableFactories.setFactory
-        (MapOutputFile.class,
-         new WritableFactory() {
-           public Writable newInstance() { return new MapOutputFile(); }
-         });
-    }
-
-  private String mapTaskId;
-  private String reduceTaskId;
-  private int mapId;
-  private int partition;
-  private long size;
+  private JobConf conf;
   
-  /** Permits reporting of file copy progress. */
-  public interface ProgressReporter {
-    void progress(float progress) throws IOException;
-  }
-
-  private ThreadLocal REPORTERS = new ThreadLocal();
-  private JobConf jobConf;
-  
-  public void setProgressReporter(ProgressReporter reporter) {
-    REPORTERS.set(reporter);
-  }
-
   /** Create a local map output file name.
    * @param mapTaskId a map task id
    * @param partition a reduce partition
    */
   public Path getOutputFile(String mapTaskId, int partition)
     throws IOException {
-    return this.jobConf.getLocalPath(mapTaskId+"/part-"+partition+".out");
+    return conf.getLocalPath(mapTaskId+"/part-"+partition+".out");
   }
 
   /** Create a local reduce input file name.
@@ -71,12 +44,12 @@
   public Path getInputFile(int mapId, String reduceTaskId)
     throws IOException {
     // TODO *oom* should use a format here
-    return this.jobConf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
+    return conf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
   }
 
   /** Removes all of the files related to a task. */
   public void removeAll(String taskId) throws IOException {
-    this.jobConf.deleteLocalFiles(taskId);
+    conf.deleteLocalFiles(taskId);
   }
 
   /** 
@@ -84,108 +57,14 @@
    * startup, to remove any leftovers from previous run.
    */
   public void cleanupStorage() throws IOException {
-    this.jobConf.deleteLocalFiles();
-  }
-
-  /** Construct a file for transfer. */
-  public MapOutputFile() { 
-  }
-  
-  public MapOutputFile(String mapTaskId, String reduceTaskId, 
-                       int mapId, int partition) {
-    this.mapTaskId = mapTaskId;
-    this.reduceTaskId = reduceTaskId;
-    this.mapId = mapId;
-    this.partition = partition;
-  }
-
-  private FileSystem getLocalFs() throws IOException {
-    return FileSystem.getNamed("local", this.jobConf);
-  }
-  
-  public long getSize() {
-    return size;
-  }
-
-  public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, mapTaskId);
-    UTF8.writeString(out, reduceTaskId);
-    out.writeInt(mapId);
-    out.writeInt(partition);
-    
-    Path file = getOutputFile(mapTaskId, partition);
-    FSDataInputStream in = null;
-    try {
-      // write the length-prefixed file content to the wire
-      this.size = getLocalFs().getLength(file);
-      out.writeLong(this.size);
-      in = getLocalFs().open(file);
-    } catch (FileNotFoundException e) {
-      TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e);
-      ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
-      throw e;
-    }
-    try {
-      byte[] buffer = new byte[65536];
-      int l  = 0;
-      
-      while (l != -1) {
-        out.write(buffer, 0, l);
-        try {
-          l = in.read(buffer);
-        } catch (IOException e) {
-          TaskTracker.LOG.log(Level.SEVERE,"Can't read map output:" + file, e);
-          ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
-          throw e;
-        }
-      }
-    } finally {
-      in.close();
-    }
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    this.mapTaskId = UTF8.readString(in);
-    this.reduceTaskId = UTF8.readString(in);
-    this.mapId = in.readInt();
-    this.partition = in.readInt();
-
-    ProgressReporter reporter = (ProgressReporter)REPORTERS.get();
-
-    // read the length-prefixed file content into a local file
-    Path file = getInputFile(mapId, reduceTaskId);
-    long length = in.readLong();
-    this.size = length;
-    
-    float progPerByte = 1.0f / length;
-    long unread = length;
-    FSDataOutputStream out = getLocalFs().create(file);
-    try {
-      byte[] buffer = new byte[65536];
-      while (unread > 0) {
-          int bytesToRead = (int)Math.min(unread, buffer.length);
-          in.readFully(buffer, 0, bytesToRead);
-          out.write(buffer, 0, bytesToRead);
-          unread -= bytesToRead;
-          if (reporter != null) {
-            reporter.progress((length-unread)*progPerByte);
-          }
-      }
-    } finally {
-      out.close();
-    }
+    conf.deleteLocalFiles();
   }
 
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
-      jobConf = (JobConf) conf;
+      this.conf = (JobConf) conf;
     } else {
-      this.jobConf = new JobConf(conf);
+      this.conf = new JobConf(conf);
     }
   }
-
-  public Configuration getConf() {
-    return this.jobConf;
-  }
-
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri May 26 12:43:32 2006
@@ -19,6 +19,9 @@
 import java.io.IOException;
 
 import java.io.*;
+import java.net.URL;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 
 /** The location of a map output file, as passed to a reduce task via the
@@ -83,7 +86,36 @@
   }
 
   public String toString() {
-    return mapTaskId+"@"+host+":"+port;
+    return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" + 
+            mapTaskId;
   }
 
+  /**
+   * Get the map output into a local file from the remote server.
+   * We use the file system so that we generate checksum files on the data.
+   * @param fileSys the filesystem to write the file to
+   * @param localFilename the filename to write the data into
+   * @param reduce the reduce id to get for
+   * @throws IOException when something goes wrong
+   */
+  public long getFile(FileSystem fileSys, 
+                      Path localFilename, int reduce) throws IOException {
+    URL path = new URL(toString() + "&reduce=" + reduce);
+    InputStream input = path.openConnection().getInputStream();
+    OutputStream output = fileSys.create(localFilename);
+    long totalBytes = 0;
+    try {
+      byte[] buffer = new byte[64 * 1024];
+      int len = input.read(buffer);
+      while (len > 0) {
+        totalBytes += len;
+        output.write(buffer, 0 ,len);
+        len = input.read(buffer);
+      }
+    } finally {
+      input.close();
+      output.close();
+    }
+    return totalBytes;
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri May 26 12:43:32 2006
@@ -60,7 +60,7 @@
     this.partition = partition;
   }
 
-  public TaskRunner createRunner(TaskTracker tracker) {
+  public TaskRunner createRunner(TaskTracker tracker) throws IOException {
     return new ReduceTaskRunner(this, tracker, this.conf);
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri May 26 12:43:32 2006
@@ -15,16 +15,14 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.*;
 
 import java.io.*;
-import java.net.*;
 import java.util.*;
-import java.lang.reflect.Method;
 import java.text.DecimalFormat;
 
-
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
   
@@ -54,11 +52,6 @@
   private int numCopiers;
 
   /**
-   * timeout for copy operations
-   */
-  private int copyTimeout;
-  
-  /**
    * the maximum amount of time (less 1 minute) to wait to 
    * contact a host after a copy from it fails. We wait for (1 min +
    * Random.nextInt(maxBackoff)) seconds.
@@ -82,6 +75,11 @@
   private long lastPollTime;
   
   /**
+   * A reference to the local file system for writing the map outputs to.
+   */
+  private FileSystem localFileSys;
+  
+  /**
    * the minimum interval between jobtracker polls
    */
   private static final long MIN_POLL_INTERVAL = 5000;
@@ -91,22 +89,6 @@
    */
   private static final int PROBE_SAMPLE_SIZE = 50;
 
-  // initialization code to resolve "getFile" to a method object
-  private static Method getFileMethod = null;
-  static {
-    Class[] paramTypes = { String.class, String.class,
-                           int.class, int.class };    
-    try {
-      getFileMethod = 
-        MapOutputProtocol.class.getDeclaredMethod("getFile", paramTypes);
-    }
-    catch (NoSuchMethodException e) {
-      LOG.severe(StringUtils.stringifyException(e));
-      throw new RuntimeException("Can't find \"getFile\" method "
-                                 + "of MapOutputProtocol", e);
-    }
-  }
-  
   /** Represents the result of an attempt to copy a map output */
   private class CopyResult {
     
@@ -174,47 +156,42 @@
     private long copyOutput(MapOutputLocation loc)
     throws IOException {
 
-      Object[] params = new Object[4];
-      params[0] = loc.getMapTaskId();
-      params[1] = reduceTask.getTaskId();
-      params[2] = new Integer(loc.getMapId());
-      params[3] = new Integer(reduceTask.getPartition());
-      
-      LOG.info(reduceTask.getTaskId() + " copy started: " +
-               loc.getMapTaskId() + " from " + loc.getHost());
+      String reduceId = reduceTask.getTaskId();
+      LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
+               " output from " + loc.getHost() + ".");
 
-      Socket sock = new Socket(loc.getHost(), loc.getPort());
       try {
-        sock.setSoTimeout(copyTimeout);
-
         // this copies the map output file
-        MapOutputFile file =
-          (MapOutputFile)RPC.callRaw(getFileMethod, params, sock, conf);
+        Path filename = conf.getLocalPath(reduceId + "/map_" +
+                                          loc.getMapId() + ".out");
+        long bytes = loc.getFile(localFileSys, filename,
+                                 reduceTask.getPartition());
 
-        LOG.info(reduceTask.getTaskId() + " copy finished: " +
-                 loc.getMapTaskId() + " from " + loc.getHost());      
+        LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
+                 " output from " + loc.getHost() + ".");
 
-        return file.getSize();
+        return bytes;
       }
-      finally {
-        try {
-          sock.close();
-        } catch (IOException e) { } // IGNORE
+      catch (IOException e) {
+        LOG.warning(reduceTask.getTaskId() + " failed to copy " + loc.getMapTaskId() +
+                    " output from " + loc.getHost() + ".");
+        throw e;
       }
     }
 
   }
   
-  public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
+  public ReduceTaskRunner(Task task, TaskTracker tracker, 
+                          JobConf conf) throws IOException {
     super(task, tracker, conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
+    localFileSys = FileSystem.getNamed("local", conf);
 
     this.reduceTask = (ReduceTask)getTask();
     this.scheduledCopies = new ArrayList(100);
     this.copyResults = new ArrayList(100);    
     this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
-    this.copyTimeout = conf.getInt("ipc.client.timeout", 10000);
     this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
 
     // hosts -> next contact time

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java Fri May 26 12:43:32 2006
@@ -24,6 +24,7 @@
 import org.mortbay.http.HttpContext;
 import org.mortbay.http.handler.ResourceHandler;
 import org.mortbay.http.SocketListener;
+import org.mortbay.jetty.servlet.WebApplicationContext;
 
 /**
  * Create a Jetty embedded server to answer http requests. The primary goal
@@ -35,17 +36,26 @@
  * @author Owen O'Malley
  */
 public class StatusHttpServer {
-  private static org.mortbay.jetty.Server webServer = null;
   private static final boolean isWindows = 
     System.getProperty("os.name").startsWith("Windows");
+  private org.mortbay.jetty.Server webServer;
+  private SocketListener listener;
+  private boolean findPort;
+  private WebApplicationContext webAppContext;
   
   /**
    * Create a status server on the given port.
    * The jsp scripts are taken from src/webapps/<name>.
+   * @param name The name of the server
+   * @param port The port to use on the server
+   * @param findPort whether the server should start at the given port and 
+   *        increment by 1 until it finds a free port.
    */
-  public StatusHttpServer(String name, int port) throws IOException {
+  public StatusHttpServer(String name, int port, 
+                          boolean findPort) throws IOException {
     webServer = new org.mortbay.jetty.Server();
-    SocketListener listener = new SocketListener();
+    this.findPort = findPort;
+    listener = new SocketListener();
     listener.setPort(port);
     webServer.addListener(listener);
 
@@ -66,10 +76,30 @@
     webServer.addContext(staticContext);
 
     // set up the context for "/" jsp files
-    webServer.addWebApplication("/", appDir + File.separator + name);      
+    webAppContext = 
+      webServer.addWebApplication("/", appDir + File.separator + name);      
   }
 
   /**
+   * Set a value in the webapp context. These values are available to the jsp
+   * pages as "application.getAttribute(name)".
+   * @param name The name of the attribute
+   * @param value The value of the attribute
+   */
+  public void setAttribute(String name, Object value) {
+    webAppContext.setAttribute(name,value);
+  }
+  
+  /**
+   * Get the value in the webapp context.
+   * @param name The name of the attribute
+   * @return The value of the attribute
+   */
+  public Object getAttribute(String name) {
+    return webAppContext.getAttribute(name);
+  }
+  
+  /**
    * Get the pathname to the webapps files.
    * @return the pathname
    */
@@ -87,11 +117,44 @@
   }
   
   /**
+   * Get the port that the server is on
+   * @return the port
+   */
+  public int getPort() {
+    return listener.getPort();
+  }
+
+  public void setThreads(int min, int max) {
+    listener.setMinThreads(min);
+    listener.setMaxThreads(max);
+  }
+  /**
    * Start the server. Does not wait for the server to start.
    */
   public void start() throws IOException {
     try {
-      webServer.start();
+      while (true) {
+        try {
+          webServer.start();
+          break;
+        } catch (org.mortbay.util.MultiException ex) {
+          // look for the multi exception containing a bind exception,
+          // in that case try the next port number.
+          boolean needNewPort = false;
+          for(int i=0; i < ex.size(); ++i) {
+            Exception sub = ex.getException(i);
+            if (sub instanceof java.net.BindException) {
+              needNewPort = true;
+              break;
+            }
+          }
+          if (!findPort || !needNewPort) {
+            throw ex;
+          } else {
+            listener.setPort(listener.getPort() + 1);
+          }
+        }
+      }
     } catch (IOException ie) {
       throw ie;
     } catch (Exception e) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri May 26 12:43:32 2006
@@ -76,7 +76,8 @@
 
 
   /** Return an approprate thread runner for this task. */
-  public abstract TaskRunner createRunner(TaskTracker tracker);
+  public abstract TaskRunner createRunner(TaskTracker tracker
+                                          ) throws IOException;
 
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 1000;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri May 26 12:43:32 2006
@@ -224,28 +224,6 @@
     }
 
     /**
-     * A TaskInProgress might be speculatively executed, and so
-     * can have many taskids simultaneously.  Reduce tasks rely on knowing
-     * their predecessor ids, so they can be sure that all the previous
-     * work has been completed.
-     *
-     * But we don't know ahead of time which task id will actually be
-     * the one that completes for a given Map task.  We don't want the
-     * Reduce task to have to be recreated after Map-completion, or check
-     * in with the JobTracker.  So instead, each TaskInProgress preallocates
-     * all the task-ids it could ever want to run simultaneously.  Then the
-     * Reduce task can be told about all the ids task-ids for a given Map 
-     * TaskInProgress.  If any of the Map TIP's tasks complete, the Reduce
-     * task will know all is well, and can continue.
-     *
-     * Most of the time, only a small number of the possible task-ids will
-     * ever be used.
-     */
-    public String[] getAllPossibleTaskIds() {
-        return totalTaskIds;
-    }
-
-    /**
      * Creates a "status report" for this task.  Includes the
      * task ID and overall status, plus reports for all the
      * component task-threads that have ever been started.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri May 26 12:43:32 2006
@@ -31,8 +31,8 @@
  *
  * @author Mike Cafarella
  *******************************************************/
-public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
-    private static TaskTracker taskTracker = null;
+public class TaskTracker 
+             implements MRConstants, TaskUmbilicalProtocol, Runnable {
     static final long WAIT_FOR_DONE = 3 * 1000;
     private long taskTimeout; 
     private int httpPort;
@@ -100,36 +100,30 @@
       taskCleanupThread.start();
     }
     
-    class MapOutputServer extends RPC.Server {
-      private MapOutputServer(int port, int threads) {
-        super(TaskTracker.this, fConf, port, threads, false);
-      }
-      public TaskTracker getTaskTracker() {
-        return TaskTracker.this;
-      }
-    }
-
     /**
      * Start with the local machine name, and the default JobTracker
      */
-    public TaskTracker(JobConf conf, int httpPort) throws IOException {
-      this(JobTracker.getAddress(conf), conf, httpPort);
-    }
-
-    /**
-     * Start with the local machine name, and the addr of the target JobTracker
-     */
-    public TaskTracker(InetSocketAddress jobTrackAddr, JobConf conf,
-                       int httpPort) throws IOException {
-        maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
-
-        this.fConf = conf;
-        this.jobTrackAddr = jobTrackAddr;
-        this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
-        this.mapOutputFile = new MapOutputFile();
-        this.mapOutputFile.setConf(conf);
-        this.httpPort = httpPort;
-        initialize();
+    public TaskTracker(JobConf conf) throws IOException {
+      maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
+      this.fConf = conf;
+      this.jobTrackAddr = JobTracker.getAddress(conf);
+      this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
+      this.mapOutputFile = new MapOutputFile();
+      this.mapOutputFile.setConf(conf);
+      int httpPort = conf.getInt("tasktracker.http.port", 50060);
+      StatusHttpServer server = new StatusHttpServer("task", httpPort, true);
+      int workerThreads = conf.getInt("tasktracker.http.threads", 40);
+      server.setThreads(1, workerThreads);
+      server.start();
+      this.httpPort = server.getPort();
+      // let the jsp pages get to the task tracker, config, and other relevant
+      // objects
+      FileSystem local = FileSystem.getNamed("local", conf);
+      server.setAttribute("task.tracker", this);
+      server.setAttribute("local.file.system", local);
+      server.setAttribute("conf", conf);
+      server.setAttribute("log", LOG);
+      initialize();
     }
 
     /**
@@ -164,16 +158,6 @@
             }
         
         }
-        while (true) {
-            try {
-                this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);
-                this.mapOutputServer.start();
-                break;
-            } catch (BindException e) {
-                LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port");
-                this.mapOutputPort++;
-            }
-        }
         this.taskTrackerName = "tracker_" + 
                                localHostname + ":" + taskReportPort;
         LOG.info("Starting tracker " + taskTrackerName);
@@ -730,17 +714,6 @@
         }
     }
 
-    /////////////////////////////////////////////////////////////////
-    // MapOutputProtocol
-    /////////////////////////////////////////////////////////////////
-    public MapOutputFile getFile(String mapTaskId, String reduceTaskId,
-                                 int mapId, int partition) {
-    MapOutputFile mapOutputFile = 
-      new MapOutputFile(mapTaskId, reduceTaskId, mapId, partition);
-    mapOutputFile.setConf(this.fConf);
-    return mapOutputFile;
-  }
-
     // ///////////////////////////////////////////////////////////////
     // TaskUmbilicalProtocol
     /////////////////////////////////////////////////////////////////
@@ -910,14 +883,6 @@
     }
 
     /**
-     * Get the task tracker for use with the webapp stuff.
-     * @return The task tracker object
-     */
-    static TaskTracker getTracker() {
-      return taskTracker;
-    }
-    
-    /**
      * Get the name for this task tracker.
      * @return the string like "tracker_mymachine:50010"
      */
@@ -958,10 +923,6 @@
 
         JobConf conf=new JobConf();
         LogFormatter.initFileHandler( conf, "tasktracker" );
-        int httpPort = conf.getInt("tasktracker.http.port", 50060);
-        StatusHttpServer server = new StatusHttpServer("task", httpPort);
-        server.start();
-        taskTracker = new TaskTracker(conf, httpPort);
-        taskTracker.run();
+        new TaskTracker(conf).run();
     }
 }

Added: lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp?rev=409723&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp (added)
+++ lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp Fri May 26 12:43:32 2006
@@ -0,0 +1,51 @@
+<%@ page
+  contentType="application/octet-stream"
+  session="false"
+  buffer="64kb"
+  import="javax.servlet.*"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.util.*"
+  import="java.util.logging.Logger"
+  import="org.apache.hadoop.fs.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.util.*"
+%><%
+  String mapId = request.getParameter("map");
+  String reduceId = request.getParameter("reduce");
+  if (mapId == null || reduceId == null) {
+    throw new IOException("map and reduce parameters are required");
+  }
+  int reduce = Integer.parseInt(reduceId);
+  byte[] buffer = new byte[64*1024];
+  OutputStream outStream = response.getOutputStream();
+  JobConf conf = (JobConf) application.getAttribute("conf");
+  FileSystem fileSys = 
+     (FileSystem) application.getAttribute("local.file.system");
+  Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
+  response.resetBuffer();
+  InputStream inStream = null;
+  try {
+    inStream = fileSys.open(filename);
+    int len = inStream.read(buffer);
+    while (len > 0) {
+      outStream.write(buffer, 0, len);
+      len = inStream.read(buffer);
+    }
+  } catch (IOException ie) {
+    TaskTracker tracker = 
+       (TaskTracker) application.getAttribute("task.tracker");
+    Logger log = (Logger) application.getAttribute("log");
+    log.warning("Http server (getMapOutput.jsp): " +
+                StringUtils.stringifyException(ie));
+    tracker.mapOutputLost(mapId);
+    throw ie;
+  } finally {
+    if (inStream != null) {
+      inStream.close();
+    }
+    if (outStream != null) {
+      outStream.close();
+    }
+  }
+%>
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp Fri May 26 12:43:32 2006
@@ -27,7 +27,7 @@
  %>
 
 <%
-  TaskTracker tracker = TaskTracker.getTracker();
+  TaskTracker tracker = (TaskTracker) application.getAttribute("task.tracker");
   String trackerName = tracker.getName();
 %>