You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/29 23:25:22 UTC

svn commit: r451451 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/webapps/task/

Author: cutting
Date: Fri Sep 29 14:25:21 2006
New Revision: 451451

URL: http://svn.apache.org/viewvc?view=rev&rev=451451
Log:
HADOOP-513.  Replace map output handling with a servlet.  Contributed by Owen.

Removed:
    lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Sep 29 14:25:21 2006
@@ -95,6 +95,11 @@
     DFS, prohibiting the use of "." or ".." as directory or file
     names.  (Wendy Chien via cutting)
 
+24. HADOOP-513.  Replace map output handling with a servlet, rather
+    than a JSP page.  This fixes an issue where
+    IllegalStateException's were logged, sets content-length
+    correctly, and better handles some errors.  (omalley via cutting)
+
 
 Release 0.6.2 (unreleased)
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Sep 29 14:25:21 2006
@@ -161,10 +161,7 @@
         String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
         this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
         //create a servlet to serve full-file content
-        try {
-          this.infoServer.addServlet(null, "/streamFile/*",
-                "org.apache.hadoop.dfs.StreamFile", null);
-        } catch (Exception e) {LOG.warn("addServlet threw exception", e);}
+        this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
         this.infoServer.start();
         this.dnRegistration.infoPort = this.infoServer.getPort();
         datanodeObject = this;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri Sep 29 14:25:21 2006
@@ -87,7 +87,7 @@
   }
 
   public String toString() {
-    return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" + 
+    return "http://" + host + ":" + port + "/mapOutput?map=" + 
             mapTaskId;
   }
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java Fri Sep 29 14:25:21 2006
@@ -21,11 +21,12 @@
 import java.net.URL;
 import java.net.URLDecoder;
 
+import javax.servlet.http.HttpServlet;
+
 import org.mortbay.http.HttpContext;
 import org.mortbay.http.handler.ResourceHandler;
 import org.mortbay.http.SocketListener;
 import org.mortbay.jetty.servlet.WebApplicationContext;
-import org.mortbay.jetty.servlet.ServletHttpContext;
 
 /**
  * Create a Jetty embedded server to answer http requests. The primary goal
@@ -97,19 +98,33 @@
    * @param name The name of the servlet (can be passed as null)
    * @param pathSpec The path spec for the servlet
    * @param classname The class name for the servlet
-   * @param contextPath The context path (can be null, defaults to "/")
    */
-  public void addServlet(String name, String pathSpec, String classname,
-     String contextPath) 
- throws ClassNotFoundException, InstantiationException, IllegalAccessException {
-    String tmpContextPath = contextPath;
-    if (tmpContextPath == null) tmpContextPath = "/";
-    ServletHttpContext context = 
-                    (ServletHttpContext)webServer.getContext(tmpContextPath);
-    if (name == null)
-      context.addServlet(pathSpec, classname);
-    else
-      context.addServlet(name, pathSpec, classname);
+  public <T extends HttpServlet> 
+  void addServlet(String name, String pathSpec, 
+                  Class<T> servletClass) {
+    WebApplicationContext context = webAppContext;
+    try {
+      if (name == null) {
+        context.addServlet(pathSpec, servletClass.getName());
+      } else {
+        context.addServlet(name, pathSpec, servletClass.getName());
+      } 
+    } catch (ClassNotFoundException ex) {
+      throw makeRuntimeException("Problem instantiating class", ex);
+    } catch (InstantiationException ex) {
+      throw makeRuntimeException("Problem instantiating class", ex);
+    } catch (IllegalAccessException ex) {
+      throw makeRuntimeException("Problem instantiating class", ex);
+    }
+  }
+  
+  private static RuntimeException makeRuntimeException(String msg, 
+                                                       Throwable cause) {
+    RuntimeException result = new RuntimeException(msg);
+    if (cause != null) {
+      result.initCause(cause);
+    }
+    return result;
   }
   
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 29 14:25:21 2006
@@ -28,8 +28,12 @@
 import java.util.*;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.metrics.ContextFactory;
-import org.apache.hadoop.metrics.MetricsContext;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.net.DNS;
 
@@ -361,8 +365,6 @@
       this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);
       int workerThreads = conf.getInt("tasktracker.http.threads", 40);
       server.setThreads(1, workerThreads);
-      server.start();
-      this.httpPort = server.getPort();
       // let the jsp pages get to the task tracker, config, and other relevant
       // objects
       FileSystem local = FileSystem.getNamed("local", conf);
@@ -370,6 +372,9 @@
       server.setAttribute("local.file.system", local);
       server.setAttribute("conf", conf);
       server.setAttribute("log", LOG);
+      server.addServlet("mapOutput", "mapOutput", MapOutputServlet.class);
+      server.start();
+      this.httpPort = server.getPort();
       initialize();
     }
 
@@ -1317,5 +1322,56 @@
                       StringUtils.stringifyException(e));
             System.exit(-1);
         }
+    }
+    
+    /**
+     * This class is used in TaskTracker's Jetty to serve the map outputs
+     * to other nodes.
+     * @author Owen O'Malley
+     */
+    public static class MapOutputServlet extends HttpServlet {
+      public void doGet(HttpServletRequest request, 
+                        HttpServletResponse response
+                       ) throws ServletException, IOException {
+        String mapId = request.getParameter("map");
+        String reduceId = request.getParameter("reduce");
+        if (mapId == null || reduceId == null) {
+          throw new IOException("map and reduce parameters are required");
+        }
+        ServletContext context = getServletContext();
+        int reduce = Integer.parseInt(reduceId);
+        byte[] buffer = new byte[64*1024];
+        OutputStream outStream = response.getOutputStream();
+        JobConf conf = (JobConf) context.getAttribute("conf");
+        FileSystem fileSys = 
+          (FileSystem) context.getAttribute("local.file.system");
+        Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
+        response.setContentLength((int) fileSys.getLength(filename));
+        InputStream inStream = null;
+        try {
+          inStream = fileSys.open(filename);
+          try {
+            int len = inStream.read(buffer);
+            while (len > 0) {
+              outStream.write(buffer, 0, len);
+              len = inStream.read(buffer);
+            }
+          } finally {
+            inStream.close();
+            outStream.close();
+          }
+        } catch (IOException ie) {
+          TaskTracker tracker = 
+            (TaskTracker) context.getAttribute("task.tracker");
+          Log log = (Log) context.getAttribute("log");
+          String errorMsg = "getMapOutput(" + mapId + "," + reduceId + 
+          ") failed :\n"+
+          StringUtils.stringifyException(ie);
+          log.warn(errorMsg);
+          tracker.mapOutputLost(mapId, errorMsg);
+          response.sendError(HttpServletResponse.SC_GONE, errorMsg);
+          throw ie;
+        } 
+      }
     }
 }