You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/19 18:38:45 UTC
svn commit: r669549 - in /hadoop/core/branches/branch-0.18: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Author: ddas
Date: Thu Jun 19 09:38:45 2008
New Revision: 669549
URL: http://svn.apache.org/viewvc?rev=669549&view=rev
Log:
Merge -r 669546:669547 from trunk onto 0.18 branch. Fixes HADOOP-3546.
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=669549&r1=669548&r2=669549&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Thu Jun 19 09:38:45 2008
@@ -636,6 +636,9 @@
HADOOP-3534. Log IOExceptions that happen in closing the name
system when the NameNode shuts down. (Tsz Wo (Nicholas) Sze via omalley)
+ HADOOP-3546. TaskTracker re-initialization gets stuck in cleaning up.
+ (Amareshwari Sriramadasu via ddas)
+
Release 0.17.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=669549&r1=669548&r2=669549&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Jun 19 09:38:45 2008
@@ -297,9 +297,6 @@
public void run() {
while (true) {
try {
- if (tasksToCleanup.isEmpty() && !isRunning()) {
- break;
- }
TaskTrackerAction action = tasksToCleanup.take();
if (action instanceof KillJobAction) {
purgeJob((KillJobAction) action);
@@ -322,10 +319,6 @@
}
}
}, "taskCleanup");
- {
- taskCleanupThread.setDaemon(true);
- taskCleanupThread.start();
- }
private RunningJob addTaskToJob(JobID jobId,
Path localJobFile,
@@ -398,12 +391,9 @@
fConf.get("mapred.tasktracker.dns.nameserver","default"));
}
- directoryCleanupThread = new CleanupQueue(fConf);
- directoryCleanupThread.start();
-
//check local disk
checkLocalDirs(this.fConf.getLocalDirs());
- directoryCleanupThread.addToQueue(getLocalFiles(fConf, SUBDIR));
+ fConf.deleteLocalFiles(SUBDIR);
// Clear out state tables
this.tasks.clear();
@@ -458,7 +448,6 @@
InterTrackerProtocol.versionID,
jobTrackAddr, this.fConf);
- this.running = true;
// start the thread that will fetch map task completion events
this.mapEventsFetcher = new MapEventsFetcherThread();
mapEventsFetcher.setDaemon(true);
@@ -801,28 +790,6 @@
// Shutdown the fetcher thread
this.mapEventsFetcher.interrupt();
- // shutdown cleanup threads.
- if (this.taskCleanupThread != null
- && this.taskCleanupThread.isAlive()) {
- LOG.info("Stopping task cleanup thread");
- this.taskCleanupThread.interrupt();
- try {
- this.taskCleanupThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- if (this.directoryCleanupThread != null
- && this.directoryCleanupThread.isAlive()) {
- LOG.info("Stopping directory cleanup thread");
- this.directoryCleanupThread.interrupt();
- try {
- this.directoryCleanupThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
-
// shutdown RPC connections
RPC.stopProxy(jobClient);
}
@@ -867,6 +834,14 @@
initialize();
}
+ private void startCleanupThreads() throws IOException {
+ taskCleanupThread.setDaemon(true);
+ taskCleanupThread.start();
+ directoryCleanupThread = new CleanupQueue(originalConf);
+ directoryCleanupThread.setDaemon(true);
+ directoryCleanupThread.start();
+ }
+
/**
* The connection to the JobTracker, used by the TaskRunner
* for locating remote files.
@@ -1355,6 +1330,8 @@
*/
public void run() {
try {
+ startCleanupThreads();
+ this.running = true;
boolean denied = false;
while (running && !shuttingDown && !denied) {
boolean staleState = false;
@@ -2398,14 +2375,6 @@
}
/**
- * True if task tracker is not shutting down.
- * @return running
- */
- public boolean isRunning() {
- return !shuttingDown;
- }
-
- /**
* This class is used in TaskTracker's Jetty to serve the map outputs
* to other nodes.
*/
@@ -2561,7 +2530,7 @@
}
// cleanup queue which deletes files/directories of the paths queued up.
- private class CleanupQueue extends Thread {
+ private static class CleanupQueue extends Thread {
private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
private JobConf conf;
@@ -2585,9 +2554,6 @@
Path path = null;
while (true) {
try {
- if (queue.isEmpty() && !isRunning()) {
- break;
- }
path = queue.take();
// delete the path.
FileSystem fs = path.getFileSystem(conf);