You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/26 21:43:35 UTC
svn commit: r409723 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/
src/webapps/task/
Author: cutting
Date: Fri May 26 12:43:32 2006
New Revision: 409723
URL: http://svn.apache.org/viewvc?rev=409723&view=rev
Log:
HADOOP-254. Use HTTP to transfer map output data to reduce nodes. Contributed by Owen.
Added:
lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
Removed:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputProtocol.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri May 26 12:43:32 2006
@@ -69,6 +69,10 @@
LocalFileSystem to include the name of the file.
(Benjamin Reed via cutting)
+19. HADOOP-254. Use HTTP to transfer map output data to reduce
+ nodes. This, together with HADOOP-195, greatly improves the
+ performance of these transfers. (omalley via cutting)
+
Release 0.2.1 - 2006-05-12
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri May 26 12:43:32 2006
@@ -302,6 +302,20 @@
</description>
</property>
+<property>
+ <name>tasktracker.http.threads</name>
+ <value>40</value>
+ <description>The number of worker threads that for the http server. This is
+ used for map output fetching
+ </description>
+</property>
+
+<property>
+ <name>tasktracker.http.port</name>
+ <value>50060</value>
+ <description>The default port for task trackers to use as their http server.
+ </description>
+</property>
<!-- ipc properties -->
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Fri May 26 12:43:32 2006
@@ -23,7 +23,6 @@
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.util.logging.*;
import java.io.*;
@@ -186,65 +185,6 @@
return values;
}
-
-
- /** Expert: Make an RPC call over the specified socket. Assumes that no other calls
- * are in flight on this connection. */
- public static Object callRaw(Method method, Object[] params,
- Socket sock, Configuration conf)
- throws IOException {
-
- Invocation inv = new Invocation(method, params);
- DataInputStream in =
- new DataInputStream(new BufferedInputStream(sock.getInputStream()));
- DataOutputStream out =
- new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
- String name = new String("Client connection to " +
- sock.getInetAddress().getHostName() +
- ":" + sock.getPort());
-
- try {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine(name + " sending #0");
- }
-
- // write out method invocation
- out.writeInt(0);
- inv.write(out);
- out.flush();
-
- // read return value
- int callId = in.readInt();
-
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine(name + " got response to call #" + callId);
- }
-
- boolean isError = in.readBoolean();
- if (isError) {
- throw new RemoteException(WritableUtils.readString(in),
- WritableUtils.readString(in));
- }
- else {
-
- Writable wrappedValue = (Writable)ObjectWritable.class.newInstance();
- if (wrappedValue instanceof Configurable) {
- ((Configurable) wrappedValue).setConf(conf);
- }
- wrappedValue.readFields(in);
-
- return method.getReturnType() != Void.TYPE ?
- ((ObjectWritable)wrappedValue).get() : null;
- }
- }
- catch (InstantiationException e) {
- throw new IOException(e.toString());
- }
- catch (IllegalAccessException e) {
- throw new IOException(e.toString());
- }
- }
-
/** Construct a server for a protocol implementation instance listening on a
* port. */
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri May 26 12:43:32 2006
@@ -434,7 +434,7 @@
}
this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
- this.infoServer = new StatusHttpServer("job", infoPort);
+ this.infoServer = new StatusHttpServer("job", infoPort, false);
this.infoServer.start();
this.startTime = System.currentTimeMillis();
@@ -821,7 +821,7 @@
result.add(new MapOutputLocation(status.getTaskId(),
mapTasksNeeded[i],
tracker.getHost(),
- tracker.getPort()));
+ tracker.getHttpPort()));
}
}
return (MapOutputLocation[])
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Fri May 26 12:43:32 2006
@@ -17,51 +17,24 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.util.logging.Level;
-import java.io.*;
-import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.mapred.TaskTracker.MapOutputServer;
-/** A local file to be transferred via the {@link MapOutputProtocol}. */
-class MapOutputFile implements Writable, Configurable {
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ */
+class MapOutputFile {
- static { // register a ctor
- WritableFactories.setFactory
- (MapOutputFile.class,
- new WritableFactory() {
- public Writable newInstance() { return new MapOutputFile(); }
- });
- }
-
- private String mapTaskId;
- private String reduceTaskId;
- private int mapId;
- private int partition;
- private long size;
+ private JobConf conf;
- /** Permits reporting of file copy progress. */
- public interface ProgressReporter {
- void progress(float progress) throws IOException;
- }
-
- private ThreadLocal REPORTERS = new ThreadLocal();
- private JobConf jobConf;
-
- public void setProgressReporter(ProgressReporter reporter) {
- REPORTERS.set(reporter);
- }
-
/** Create a local map output file name.
* @param mapTaskId a map task id
* @param partition a reduce partition
*/
public Path getOutputFile(String mapTaskId, int partition)
throws IOException {
- return this.jobConf.getLocalPath(mapTaskId+"/part-"+partition+".out");
+ return conf.getLocalPath(mapTaskId+"/part-"+partition+".out");
}
/** Create a local reduce input file name.
@@ -71,12 +44,12 @@
public Path getInputFile(int mapId, String reduceTaskId)
throws IOException {
// TODO *oom* should use a format here
- return this.jobConf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
+ return conf.getLocalPath(reduceTaskId+"/map_"+mapId+".out");
}
/** Removes all of the files related to a task. */
public void removeAll(String taskId) throws IOException {
- this.jobConf.deleteLocalFiles(taskId);
+ conf.deleteLocalFiles(taskId);
}
/**
@@ -84,108 +57,14 @@
* startup, to remove any leftovers from previous run.
*/
public void cleanupStorage() throws IOException {
- this.jobConf.deleteLocalFiles();
- }
-
- /** Construct a file for transfer. */
- public MapOutputFile() {
- }
-
- public MapOutputFile(String mapTaskId, String reduceTaskId,
- int mapId, int partition) {
- this.mapTaskId = mapTaskId;
- this.reduceTaskId = reduceTaskId;
- this.mapId = mapId;
- this.partition = partition;
- }
-
- private FileSystem getLocalFs() throws IOException {
- return FileSystem.getNamed("local", this.jobConf);
- }
-
- public long getSize() {
- return size;
- }
-
- public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, mapTaskId);
- UTF8.writeString(out, reduceTaskId);
- out.writeInt(mapId);
- out.writeInt(partition);
-
- Path file = getOutputFile(mapTaskId, partition);
- FSDataInputStream in = null;
- try {
- // write the length-prefixed file content to the wire
- this.size = getLocalFs().getLength(file);
- out.writeLong(this.size);
- in = getLocalFs().open(file);
- } catch (FileNotFoundException e) {
- TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e);
- ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
- throw e;
- }
- try {
- byte[] buffer = new byte[65536];
- int l = 0;
-
- while (l != -1) {
- out.write(buffer, 0, l);
- try {
- l = in.read(buffer);
- } catch (IOException e) {
- TaskTracker.LOG.log(Level.SEVERE,"Can't read map output:" + file, e);
- ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
- throw e;
- }
- }
- } finally {
- in.close();
- }
- }
-
- public void readFields(DataInput in) throws IOException {
- this.mapTaskId = UTF8.readString(in);
- this.reduceTaskId = UTF8.readString(in);
- this.mapId = in.readInt();
- this.partition = in.readInt();
-
- ProgressReporter reporter = (ProgressReporter)REPORTERS.get();
-
- // read the length-prefixed file content into a local file
- Path file = getInputFile(mapId, reduceTaskId);
- long length = in.readLong();
- this.size = length;
-
- float progPerByte = 1.0f / length;
- long unread = length;
- FSDataOutputStream out = getLocalFs().create(file);
- try {
- byte[] buffer = new byte[65536];
- while (unread > 0) {
- int bytesToRead = (int)Math.min(unread, buffer.length);
- in.readFully(buffer, 0, bytesToRead);
- out.write(buffer, 0, bytesToRead);
- unread -= bytesToRead;
- if (reporter != null) {
- reporter.progress((length-unread)*progPerByte);
- }
- }
- } finally {
- out.close();
- }
+ conf.deleteLocalFiles();
}
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
- jobConf = (JobConf) conf;
+ this.conf = (JobConf) conf;
} else {
- this.jobConf = new JobConf(conf);
+ this.conf = new JobConf(conf);
}
}
-
- public Configuration getConf() {
- return this.jobConf;
- }
-
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri May 26 12:43:32 2006
@@ -19,6 +19,9 @@
import java.io.IOException;
import java.io.*;
+import java.net.URL;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
/** The location of a map output file, as passed to a reduce task via the
@@ -83,7 +86,36 @@
}
public String toString() {
- return mapTaskId+"@"+host+":"+port;
+ return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" +
+ mapTaskId;
}
+ /**
+ * Get the map output into a local file from the remote server.
+ * We use the file system so that we generate checksum files on the data.
+ * @param fileSys the filesystem to write the file to
+ * @param localFilename the filename to write the data into
+ * @param reduce the reduce id to get for
+ * @throws IOException when something goes wrong
+ */
+ public long getFile(FileSystem fileSys,
+ Path localFilename, int reduce) throws IOException {
+ URL path = new URL(toString() + "&reduce=" + reduce);
+ InputStream input = path.openConnection().getInputStream();
+ OutputStream output = fileSys.create(localFilename);
+ long totalBytes = 0;
+ try {
+ byte[] buffer = new byte[64 * 1024];
+ int len = input.read(buffer);
+ while (len > 0) {
+ totalBytes += len;
+ output.write(buffer, 0 ,len);
+ len = input.read(buffer);
+ }
+ } finally {
+ input.close();
+ output.close();
+ }
+ return totalBytes;
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri May 26 12:43:32 2006
@@ -60,7 +60,7 @@
this.partition = partition;
}
- public TaskRunner createRunner(TaskTracker tracker) {
+ public TaskRunner createRunner(TaskTracker tracker) throws IOException {
return new ReduceTaskRunner(this, tracker, this.conf);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri May 26 12:43:32 2006
@@ -15,16 +15,14 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.*;
import java.io.*;
-import java.net.*;
import java.util.*;
-import java.lang.reflect.Method;
import java.text.DecimalFormat;
-
/** Runs a reduce task. */
class ReduceTaskRunner extends TaskRunner {
@@ -54,11 +52,6 @@
private int numCopiers;
/**
- * timeout for copy operations
- */
- private int copyTimeout;
-
- /**
* the maximum amount of time (less 1 minute) to wait to
* contact a host after a copy from it fails. We wait for (1 min +
* Random.nextInt(maxBackoff)) seconds.
@@ -82,6 +75,11 @@
private long lastPollTime;
/**
+ * A reference to the local file system for writing the map outputs to.
+ */
+ private FileSystem localFileSys;
+
+ /**
* the minimum interval between jobtracker polls
*/
private static final long MIN_POLL_INTERVAL = 5000;
@@ -91,22 +89,6 @@
*/
private static final int PROBE_SAMPLE_SIZE = 50;
- // initialization code to resolve "getFile" to a method object
- private static Method getFileMethod = null;
- static {
- Class[] paramTypes = { String.class, String.class,
- int.class, int.class };
- try {
- getFileMethod =
- MapOutputProtocol.class.getDeclaredMethod("getFile", paramTypes);
- }
- catch (NoSuchMethodException e) {
- LOG.severe(StringUtils.stringifyException(e));
- throw new RuntimeException("Can't find \"getFile\" method "
- + "of MapOutputProtocol", e);
- }
- }
-
/** Represents the result of an attempt to copy a map output */
private class CopyResult {
@@ -174,47 +156,42 @@
private long copyOutput(MapOutputLocation loc)
throws IOException {
- Object[] params = new Object[4];
- params[0] = loc.getMapTaskId();
- params[1] = reduceTask.getTaskId();
- params[2] = new Integer(loc.getMapId());
- params[3] = new Integer(reduceTask.getPartition());
-
- LOG.info(reduceTask.getTaskId() + " copy started: " +
- loc.getMapTaskId() + " from " + loc.getHost());
+ String reduceId = reduceTask.getTaskId();
+ LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
+ " output from " + loc.getHost() + ".");
- Socket sock = new Socket(loc.getHost(), loc.getPort());
try {
- sock.setSoTimeout(copyTimeout);
-
// this copies the map output file
- MapOutputFile file =
- (MapOutputFile)RPC.callRaw(getFileMethod, params, sock, conf);
+ Path filename = conf.getLocalPath(reduceId + "/map_" +
+ loc.getMapId() + ".out");
+ long bytes = loc.getFile(localFileSys, filename,
+ reduceTask.getPartition());
- LOG.info(reduceTask.getTaskId() + " copy finished: " +
- loc.getMapTaskId() + " from " + loc.getHost());
+ LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() +
+ " output from " + loc.getHost() + ".");
- return file.getSize();
+ return bytes;
}
- finally {
- try {
- sock.close();
- } catch (IOException e) { } // IGNORE
+ catch (IOException e) {
+ LOG.warning(reduceTask.getTaskId() + " failed to copy " + loc.getMapTaskId() +
+ " output from " + loc.getHost() + ".");
+ throw e;
}
}
}
- public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
+ public ReduceTaskRunner(Task task, TaskTracker tracker,
+ JobConf conf) throws IOException {
super(task, tracker, conf);
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
+ localFileSys = FileSystem.getNamed("local", conf);
this.reduceTask = (ReduceTask)getTask();
this.scheduledCopies = new ArrayList(100);
this.copyResults = new ArrayList(100);
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
- this.copyTimeout = conf.getInt("ipc.client.timeout", 10000);
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
// hosts -> next contact time
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java Fri May 26 12:43:32 2006
@@ -24,6 +24,7 @@
import org.mortbay.http.HttpContext;
import org.mortbay.http.handler.ResourceHandler;
import org.mortbay.http.SocketListener;
+import org.mortbay.jetty.servlet.WebApplicationContext;
/**
* Create a Jetty embedded server to answer http requests. The primary goal
@@ -35,17 +36,26 @@
* @author Owen O'Malley
*/
public class StatusHttpServer {
- private static org.mortbay.jetty.Server webServer = null;
private static final boolean isWindows =
System.getProperty("os.name").startsWith("Windows");
+ private org.mortbay.jetty.Server webServer;
+ private SocketListener listener;
+ private boolean findPort;
+ private WebApplicationContext webAppContext;
/**
* Create a status server on the given port.
* The jsp scripts are taken from src/webapps/<name>.
+ * @param name The name of the server
+ * @param port The port to use on the server
+ * @param findPort whether the server should start at the given port and
+ * increment by 1 until it finds a free port.
*/
- public StatusHttpServer(String name, int port) throws IOException {
+ public StatusHttpServer(String name, int port,
+ boolean findPort) throws IOException {
webServer = new org.mortbay.jetty.Server();
- SocketListener listener = new SocketListener();
+ this.findPort = findPort;
+ listener = new SocketListener();
listener.setPort(port);
webServer.addListener(listener);
@@ -66,10 +76,30 @@
webServer.addContext(staticContext);
// set up the context for "/" jsp files
- webServer.addWebApplication("/", appDir + File.separator + name);
+ webAppContext =
+ webServer.addWebApplication("/", appDir + File.separator + name);
}
/**
+ * Set a value in the webapp context. These values are available to the jsp
+ * pages as "application.getAttribute(name)".
+ * @param name The name of the attribute
+ * @param value The value of the attribute
+ */
+ public void setAttribute(String name, Object value) {
+ webAppContext.setAttribute(name,value);
+ }
+
+ /**
+ * Get the value in the webapp context.
+ * @param name The name of the attribute
+ * @return The value of the attribute
+ */
+ public Object getAttribute(String name) {
+ return webAppContext.getAttribute(name);
+ }
+
+ /**
* Get the pathname to the webapps files.
* @return the pathname
*/
@@ -87,11 +117,44 @@
}
/**
+ * Get the port that the server is on
+ * @return the port
+ */
+ public int getPort() {
+ return listener.getPort();
+ }
+
+ public void setThreads(int min, int max) {
+ listener.setMinThreads(min);
+ listener.setMaxThreads(max);
+ }
+ /**
* Start the server. Does not wait for the server to start.
*/
public void start() throws IOException {
try {
- webServer.start();
+ while (true) {
+ try {
+ webServer.start();
+ break;
+ } catch (org.mortbay.util.MultiException ex) {
+ // look for the multi exception containing a bind exception,
+ // in that case try the next port number.
+ boolean needNewPort = false;
+ for(int i=0; i < ex.size(); ++i) {
+ Exception sub = ex.getException(i);
+ if (sub instanceof java.net.BindException) {
+ needNewPort = true;
+ break;
+ }
+ }
+ if (!findPort || !needNewPort) {
+ throw ex;
+ } else {
+ listener.setPort(listener.getPort() + 1);
+ }
+ }
+ }
} catch (IOException ie) {
throw ie;
} catch (Exception e) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri May 26 12:43:32 2006
@@ -76,7 +76,8 @@
/** Return an approprate thread runner for this task. */
- public abstract TaskRunner createRunner(TaskTracker tracker);
+ public abstract TaskRunner createRunner(TaskTracker tracker
+ ) throws IOException;
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 1000;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri May 26 12:43:32 2006
@@ -224,28 +224,6 @@
}
/**
- * A TaskInProgress might be speculatively executed, and so
- * can have many taskids simultaneously. Reduce tasks rely on knowing
- * their predecessor ids, so they can be sure that all the previous
- * work has been completed.
- *
- * But we don't know ahead of time which task id will actually be
- * the one that completes for a given Map task. We don't want the
- * Reduce task to have to be recreated after Map-completion, or check
- * in with the JobTracker. So instead, each TaskInProgress preallocates
- * all the task-ids it could ever want to run simultaneously. Then the
- * Reduce task can be told about all the ids task-ids for a given Map
- * TaskInProgress. If any of the Map TIP's tasks complete, the Reduce
- * task will know all is well, and can continue.
- *
- * Most of the time, only a small number of the possible task-ids will
- * ever be used.
- */
- public String[] getAllPossibleTaskIds() {
- return totalTaskIds;
- }
-
- /**
* Creates a "status report" for this task. Includes the
* task ID and overall status, plus reports for all the
* component task-threads that have ever been started.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri May 26 12:43:32 2006
@@ -31,8 +31,8 @@
*
* @author Mike Cafarella
*******************************************************/
-public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
- private static TaskTracker taskTracker = null;
+public class TaskTracker
+ implements MRConstants, TaskUmbilicalProtocol, Runnable {
static final long WAIT_FOR_DONE = 3 * 1000;
private long taskTimeout;
private int httpPort;
@@ -100,36 +100,30 @@
taskCleanupThread.start();
}
- class MapOutputServer extends RPC.Server {
- private MapOutputServer(int port, int threads) {
- super(TaskTracker.this, fConf, port, threads, false);
- }
- public TaskTracker getTaskTracker() {
- return TaskTracker.this;
- }
- }
-
/**
* Start with the local machine name, and the default JobTracker
*/
- public TaskTracker(JobConf conf, int httpPort) throws IOException {
- this(JobTracker.getAddress(conf), conf, httpPort);
- }
-
- /**
- * Start with the local machine name, and the addr of the target JobTracker
- */
- public TaskTracker(InetSocketAddress jobTrackAddr, JobConf conf,
- int httpPort) throws IOException {
- maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
-
- this.fConf = conf;
- this.jobTrackAddr = jobTrackAddr;
- this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
- this.mapOutputFile = new MapOutputFile();
- this.mapOutputFile.setConf(conf);
- this.httpPort = httpPort;
- initialize();
+ public TaskTracker(JobConf conf) throws IOException {
+ maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
+ this.fConf = conf;
+ this.jobTrackAddr = JobTracker.getAddress(conf);
+ this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
+ this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile.setConf(conf);
+ int httpPort = conf.getInt("tasktracker.http.port", 50060);
+ StatusHttpServer server = new StatusHttpServer("task", httpPort, true);
+ int workerThreads = conf.getInt("tasktracker.http.threads", 40);
+ server.setThreads(1, workerThreads);
+ server.start();
+ this.httpPort = server.getPort();
+ // let the jsp pages get to the task tracker, config, and other relevant
+ // objects
+ FileSystem local = FileSystem.getNamed("local", conf);
+ server.setAttribute("task.tracker", this);
+ server.setAttribute("local.file.system", local);
+ server.setAttribute("conf", conf);
+ server.setAttribute("log", LOG);
+ initialize();
}
/**
@@ -164,16 +158,6 @@
}
}
- while (true) {
- try {
- this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);
- this.mapOutputServer.start();
- break;
- } catch (BindException e) {
- LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port");
- this.mapOutputPort++;
- }
- }
this.taskTrackerName = "tracker_" +
localHostname + ":" + taskReportPort;
LOG.info("Starting tracker " + taskTrackerName);
@@ -730,17 +714,6 @@
}
}
- /////////////////////////////////////////////////////////////////
- // MapOutputProtocol
- /////////////////////////////////////////////////////////////////
- public MapOutputFile getFile(String mapTaskId, String reduceTaskId,
- int mapId, int partition) {
- MapOutputFile mapOutputFile =
- new MapOutputFile(mapTaskId, reduceTaskId, mapId, partition);
- mapOutputFile.setConf(this.fConf);
- return mapOutputFile;
- }
-
// ///////////////////////////////////////////////////////////////
// TaskUmbilicalProtocol
/////////////////////////////////////////////////////////////////
@@ -910,14 +883,6 @@
}
/**
- * Get the task tracker for use with the webapp stuff.
- * @return The task tracker object
- */
- static TaskTracker getTracker() {
- return taskTracker;
- }
-
- /**
* Get the name for this task tracker.
* @return the string like "tracker_mymachine:50010"
*/
@@ -958,10 +923,6 @@
JobConf conf=new JobConf();
LogFormatter.initFileHandler( conf, "tasktracker" );
- int httpPort = conf.getInt("tasktracker.http.port", 50060);
- StatusHttpServer server = new StatusHttpServer("task", httpPort);
- server.start();
- taskTracker = new TaskTracker(conf, httpPort);
- taskTracker.run();
+ new TaskTracker(conf).run();
}
}
Added: lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp?rev=409723&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp (added)
+++ lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp Fri May 26 12:43:32 2006
@@ -0,0 +1,51 @@
+<%@ page
+ contentType="application/octet-stream"
+ session="false"
+ buffer="64kb"
+ import="javax.servlet.*"
+ import="javax.servlet.http.*"
+ import="java.io.*"
+ import="java.util.*"
+ import="java.util.logging.Logger"
+ import="org.apache.hadoop.fs.*"
+ import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.util.*"
+%><%
+ String mapId = request.getParameter("map");
+ String reduceId = request.getParameter("reduce");
+ if (mapId == null || reduceId == null) {
+ throw new IOException("map and reduce parameters are required");
+ }
+ int reduce = Integer.parseInt(reduceId);
+ byte[] buffer = new byte[64*1024];
+ OutputStream outStream = response.getOutputStream();
+ JobConf conf = (JobConf) application.getAttribute("conf");
+ FileSystem fileSys =
+ (FileSystem) application.getAttribute("local.file.system");
+ Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
+ response.resetBuffer();
+ InputStream inStream = null;
+ try {
+ inStream = fileSys.open(filename);
+ int len = inStream.read(buffer);
+ while (len > 0) {
+ outStream.write(buffer, 0, len);
+ len = inStream.read(buffer);
+ }
+ } catch (IOException ie) {
+ TaskTracker tracker =
+ (TaskTracker) application.getAttribute("task.tracker");
+ Logger log = (Logger) application.getAttribute("log");
+ log.warning("Http server (getMapOutput.jsp): " +
+ StringUtils.stringifyException(ie));
+ tracker.mapOutputLost(mapId);
+ throw ie;
+ } finally {
+ if (inStream != null) {
+ inStream.close();
+ }
+ if (outStream != null) {
+ outStream.close();
+ }
+ }
+%>
\ No newline at end of file
Modified: lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp?rev=409723&r1=409722&r2=409723&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp Fri May 26 12:43:32 2006
@@ -27,7 +27,7 @@
%>
<%
- TaskTracker tracker = TaskTracker.getTracker();
+ TaskTracker tracker = (TaskTracker) application.getAttribute("task.tracker");
String trackerName = tracker.getName();
%>