You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/29 23:25:22 UTC
svn commit: r451451 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/
src/webapps/task/
Author: cutting
Date: Fri Sep 29 14:25:21 2006
New Revision: 451451
URL: http://svn.apache.org/viewvc?view=rev&rev=451451
Log:
HADOOP-513. Replace map output handling with a servlet. Contributed by Owen.
Removed:
lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Sep 29 14:25:21 2006
@@ -95,6 +95,11 @@
DFS, prohibiting the use of "." or ".." as directory or file
names. (Wendy Chien via cutting)
+24. HADOOP-513. Replace map output handling with a servlet, rather
+ than a JSP page. This fixes an issue where
+ IllegalStateException's were logged, sets content-length
+ correctly, and better handles some errors. (omalley via cutting)
+
Release 0.6.2 (unreleased)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Sep 29 14:25:21 2006
@@ -161,10 +161,7 @@
String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
//create a servlet to serve full-file content
- try {
- this.infoServer.addServlet(null, "/streamFile/*",
- "org.apache.hadoop.dfs.StreamFile", null);
- } catch (Exception e) {LOG.warn("addServlet threw exception", e);}
+ this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.start();
this.dnRegistration.infoPort = this.infoServer.getPort();
datanodeObject = this;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri Sep 29 14:25:21 2006
@@ -87,7 +87,7 @@
}
public String toString() {
- return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" +
+ return "http://" + host + ":" + port + "/mapOutput?map=" +
mapTaskId;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java Fri Sep 29 14:25:21 2006
@@ -21,11 +21,12 @@
import java.net.URL;
import java.net.URLDecoder;
+import javax.servlet.http.HttpServlet;
+
import org.mortbay.http.HttpContext;
import org.mortbay.http.handler.ResourceHandler;
import org.mortbay.http.SocketListener;
import org.mortbay.jetty.servlet.WebApplicationContext;
-import org.mortbay.jetty.servlet.ServletHttpContext;
/**
* Create a Jetty embedded server to answer http requests. The primary goal
@@ -97,19 +98,33 @@
* @param name The name of the servlet (can be passed as null)
* @param pathSpec The path spec for the servlet
* @param classname The class name for the servlet
- * @param contextPath The context path (can be null, defaults to "/")
*/
- public void addServlet(String name, String pathSpec, String classname,
- String contextPath)
- throws ClassNotFoundException, InstantiationException, IllegalAccessException {
- String tmpContextPath = contextPath;
- if (tmpContextPath == null) tmpContextPath = "/";
- ServletHttpContext context =
- (ServletHttpContext)webServer.getContext(tmpContextPath);
- if (name == null)
- context.addServlet(pathSpec, classname);
- else
- context.addServlet(name, pathSpec, classname);
+ public <T extends HttpServlet>
+ void addServlet(String name, String pathSpec,
+ Class<T> servletClass) {
+ WebApplicationContext context = webAppContext;
+ try {
+ if (name == null) {
+ context.addServlet(pathSpec, servletClass.getName());
+ } else {
+ context.addServlet(name, pathSpec, servletClass.getName());
+ }
+ } catch (ClassNotFoundException ex) {
+ throw makeRuntimeException("Problem instantiating class", ex);
+ } catch (InstantiationException ex) {
+ throw makeRuntimeException("Problem instantiating class", ex);
+ } catch (IllegalAccessException ex) {
+ throw makeRuntimeException("Problem instantiating class", ex);
+ }
+ }
+
+ private static RuntimeException makeRuntimeException(String msg,
+ Throwable cause) {
+ RuntimeException result = new RuntimeException(msg);
+ if (cause != null) {
+ result.initCause(cause);
+ }
+ return result;
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=451451&r1=451450&r2=451451
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 29 14:25:21 2006
@@ -28,8 +28,12 @@
import java.util.*;
import java.util.regex.Pattern;
-import org.apache.hadoop.metrics.ContextFactory;
-import org.apache.hadoop.metrics.MetricsContext;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.net.DNS;
@@ -361,8 +365,6 @@
this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);
int workerThreads = conf.getInt("tasktracker.http.threads", 40);
server.setThreads(1, workerThreads);
- server.start();
- this.httpPort = server.getPort();
// let the jsp pages get to the task tracker, config, and other relevant
// objects
FileSystem local = FileSystem.getNamed("local", conf);
@@ -370,6 +372,9 @@
server.setAttribute("local.file.system", local);
server.setAttribute("conf", conf);
server.setAttribute("log", LOG);
+ server.addServlet("mapOutput", "mapOutput", MapOutputServlet.class);
+ server.start();
+ this.httpPort = server.getPort();
initialize();
}
@@ -1317,5 +1322,56 @@
StringUtils.stringifyException(e));
System.exit(-1);
}
+ }
+
+ /**
+ * This class is used in TaskTracker's Jetty to serve the map outputs
+ * to other nodes.
+ * @author Owen O'Malley
+ */
+ public static class MapOutputServlet extends HttpServlet {
+ public void doGet(HttpServletRequest request,
+ HttpServletResponse response
+ ) throws ServletException, IOException {
+ String mapId = request.getParameter("map");
+ String reduceId = request.getParameter("reduce");
+ if (mapId == null || reduceId == null) {
+ throw new IOException("map and reduce parameters are required");
+ }
+ ServletContext context = getServletContext();
+ int reduce = Integer.parseInt(reduceId);
+ byte[] buffer = new byte[64*1024];
+ OutputStream outStream = response.getOutputStream();
+ JobConf conf = (JobConf) context.getAttribute("conf");
+ FileSystem fileSys =
+ (FileSystem) context.getAttribute("local.file.system");
+ Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
+ response.setContentLength((int) fileSys.getLength(filename));
+ InputStream inStream = null;
+ try {
+ inStream = fileSys.open(filename);
+ try {
+ int len = inStream.read(buffer);
+ while (len > 0) {
+ outStream.write(buffer, 0, len);
+ len = inStream.read(buffer);
+ }
+ } finally {
+ inStream.close();
+ outStream.close();
+ }
+ } catch (IOException ie) {
+ TaskTracker tracker =
+ (TaskTracker) context.getAttribute("task.tracker");
+ Log log = (Log) context.getAttribute("log");
+ String errorMsg = "getMapOutput(" + mapId + "," + reduceId +
+ ") failed :\n"+
+ StringUtils.stringifyException(ie);
+ log.warn(errorMsg);
+ tracker.mapOutputLost(mapId, errorMsg);
+ response.sendError(HttpServletResponse.SC_GONE, errorMsg);
+ throw ie;
+ }
+ }
}
}