You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/15 20:42:37 UTC
svn commit: r406706 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/RPC.java
src/java/org/apache/hadoop/mapred/TaskTracker.java
Author: cutting
Date: Mon May 15 11:42:36 2006
New Revision: 406706
URL: http://svn.apache.org/viewcvs?rev=406706&view=rev
Log:
HADOOP-180. Quee task cleanups so that TaskTracker remains responsive. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=406706&r1=406705&r2=406706&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon May 15 11:42:36 2006
@@ -16,6 +16,11 @@
id that no existing blocks already have that id.
(Milind Bhandarkar via cutting)
+ 5. HADOOP-180. Make a daemon thread that does the actual task clean ups, so
+ that the main offerService thread in the taskTracker doesn't get stuck
+ and miss his heartbeat window. This was killing many task trackers as
+ big jobs finished (300+ tasks / node). (omalley via cutting)
+
Release 0.2.1 - 2006-05-12
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=406706&r1=406705&r2=406706&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Mon May 15 11:42:36 2006
@@ -23,7 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
-import java.util.logging.Logger;
+import java.util.logging.*;
import java.io.*;
import org.apache.hadoop.io.*;
@@ -146,8 +146,11 @@
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
+ long startTime = System.currentTimeMillis();
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), address);
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.fine("Call: " + method.getName() + " " + callTime);
return value.get();
}
}
@@ -240,7 +243,7 @@
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());
long callTime = System.currentTimeMillis() - startTime;
- LOG.fine("Call: " + call.getMethodName() + " " + callTime);
+ LOG.fine("Served: " + call.getMethodName() + " " + callTime);
if (verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=406706&r1=406705&r2=406706&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon May 15 11:42:36 2006
@@ -72,7 +72,33 @@
private int maxCurrentTasks;
private int failures;
-
+
+ /**
+ * A list of tips that should be cleaned up.
+ */
+ private BlockingQueue tasksToCleanup = new BlockingQueue();
+
+ /**
+ * A daemon-thread that pulls tips off the list of things to cleanup.
+ */
+ private Thread taskCleanupThread =
+ new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ try {
+ TaskInProgress tip = (TaskInProgress) tasksToCleanup.take();
+ tip.jobHasFinished();
+ } catch (Throwable except) {
+ LOG.warning(StringUtils.stringifyException(except));
+ }
+ }
+ }
+ });
+ {
+ taskCleanupThread.setDaemon(true);
+ taskCleanupThread.start();
+ }
+
class MapOutputServer extends RPC.Server {
private MapOutputServer(int port, int threads) {
super(TaskTracker.this, fConf, port, threads, false);
@@ -108,11 +134,8 @@
* so we can call it again and "recycle" the object after calling
* close().
*/
- void initialize() throws IOException {
+ synchronized void initialize() throws IOException {
this.localHostname = InetAddress.getLocalHost().getHostName();
- this.taskTrackerName = "tracker_" + localHostname + "_" +
- (Math.abs(r.nextInt()) % 100000);
- LOG.info("Starting tracker " + taskTrackerName);
fConf.deleteLocalFiles(SUBDIR);
@@ -148,6 +171,9 @@
this.mapOutputPort++;
}
}
+ this.taskTrackerName = "tracker_" +
+ localHostname + ":" + taskReportPort;
+ LOG.info("Starting tracker " + taskTrackerName);
// Clear out temporary files that might be lying around
this.mapOutputFile.cleanupStorage();
@@ -323,12 +349,11 @@
if (toCloseIds != null) {
synchronized (this) {
for (int i = 0; i < toCloseIds.length; i++) {
- TaskInProgress tip = (TaskInProgress) tasks.get(toCloseIds[i]);
- try {
- tip.jobHasFinished();
- } catch (IOException ie) {
- LOG.info("problem finishing task: " +
- StringUtils.stringifyException(ie));
+ Object tip = tasks.get(toCloseIds[i]);
+ if (tip != null) {
+ tasksToCleanup.put(tip);
+ } else {
+ LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
}
}
}
@@ -376,7 +401,6 @@
}
}
-
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
@@ -414,6 +438,51 @@
}
}
+ /**
+ * This class implements a queue that is put between producer and
+ * consumer threads. It will grow without bound.
+ * @author Owen O'Malley
+ */
+ static private class BlockingQueue {
+ private List queue;
+
+ /**
+ * Create an empty queue.
+ */
+ public BlockingQueue() {
+ queue = new ArrayList();
+ }
+
+ /**
+ * Put the given object at the back of the queue.
+ * @param obj
+ */
+ public void put(Object obj) {
+ synchronized (queue) {
+ queue.add(obj);
+ queue.notify();
+ }
+ }
+
+ /**
+ * Take the object at the front of the queue.
+ * It blocks until there is an object available.
+ * @return the head of the queue
+ */
+ public Object take() {
+ synchronized (queue) {
+ while (queue.isEmpty()) {
+ try {
+ queue.wait();
+ } catch (InterruptedException ie) {}
+ }
+ Object result = queue.get(0);
+ queue.remove(0);
+ return result;
+ }
+ }
+ }
+
///////////////////////////////////////////////////////
// TaskInProgress maintains all the info for a Task that
// lives at this TaskTracker. It maintains the Task object,
@@ -641,13 +710,19 @@
* controlling job is all done and the files have been copied
* away, or the task failed and we don't need the remains.
*/
- synchronized void cleanup() throws IOException {
- tasks.remove(task.getTaskId());
- try {
- runner.close();
- } catch (IOException ie) {
+ void cleanup() throws IOException {
+ String taskId = task.getTaskId();
+ LOG.fine("Cleaning up " + taskId);
+ synchronized (TaskTracker.this) {
+ tasks.remove(taskId);
+ synchronized (this) {
+ try {
+ runner.close();
+ } catch (Throwable ie) {
+ }
+ }
}
- this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId);
}
}