You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2009/05/27 13:08:21 UTC
svn commit: r779112 - in
/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred:
JobEndNotifier.java JobTracker.java TaskTracker.java TaskTrackerAction.java
TaskTrackerStatus.java
Author: stevel
Date: Wed May 27 11:08:21 2009
New Revision: 779112
URL: http://svn.apache.org/viewvc?rev=779112&view=rev
Log:
HADOOP-3628 bring the JT and TT under the lifecycle
Modified:
hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Wed May 27 11:08:21 2009
@@ -89,7 +89,11 @@
public static void stopNotifier() {
running = false;
- thread.interrupt();
+ //copy into a variable to deal with race conditions
+ Thread notifier = thread;
+ if (notifier != null) {
+ notifier.interrupt();
+ }
}
private static JobEndStatusInfo createNotification(JobConf conf,
Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed May 27 11:08:21 2009
@@ -82,6 +82,7 @@
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Service;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
@@ -90,7 +91,8 @@
* tracking MR jobs in a network environment.
*
*******************************************************/
-public class JobTracker implements MRConstants, InterTrackerProtocol,
+public class JobTracker extends Service
+ implements MRConstants, InterTrackerProtocol,
JobSubmissionProtocol, TaskTrackerManager,
RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
@@ -119,7 +121,11 @@
public static enum State { INITIALIZING, RUNNING }
State state = State.INITIALIZING;
private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
-
+ /**
+ * Time in milliseconds to sleep while trying to start the job tracker:
+ * {@value}
+ */
+ private static final int STARTUP_SLEEP_INTERVAL = 1000;
private DNSToSwitchMapping dnsToSwitchMapping;
private NetworkTopology clusterMap = new NetworkTopology();
private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -179,7 +185,7 @@
while (true) {
try {
result = new JobTracker(conf);
- result.taskScheduler.setTaskTrackerManager(result);
+ startService(result);
break;
} catch (VersionMismatch e) {
throw e;
@@ -188,19 +194,24 @@
} catch (UnknownHostException e) {
throw e;
} catch (IOException e) {
- LOG.warn("Error starting tracker: " +
- StringUtils.stringifyException(e));
+ LOG.warn("Error starting tracker: " +
+ e.getMessage(), e);
}
- Thread.sleep(1000);
+ Thread.sleep(STARTUP_SLEEP_INTERVAL);
}
- if (result != null) {
+ if (result != null && result.isRunning()) {
JobEndNotifier.startNotifier();
}
return result;
}
- public void stopTracker() throws IOException {
- JobEndNotifier.stopNotifier();
+ /**
+ * This stops the tracker, the JobEndNotifier and moves the service into the
+ * terminated state.
+ *
+ * @throws IOException for any trouble during closedown
+ */
+ public synchronized void stopTracker() throws IOException {
close();
}
@@ -1390,7 +1401,7 @@
}
}
- private final JobTrackerInstrumentation myInstrumentation;
+ private JobTrackerInstrumentation myInstrumentation;
/////////////////////////////////////////////////////////////////
// The real JobTracker
@@ -1516,7 +1527,7 @@
);
// Used to provide an HTML view on Job, Task, and TaskTracker structures
- final HttpServer infoServer;
+ HttpServer infoServer;
int infoPort;
Server interTrackerServer;
@@ -1538,9 +1549,14 @@
private QueueManager queueManager;
/**
- * Start the JobTracker process, listen on the indicated port
+ * Create the JobTracker, based on the configuration
+ * @param conf configuration to use
+ * @throws IOException on problems initializing the tracker
*/
JobTracker(JobConf conf) throws IOException, InterruptedException {
+ super(conf);
+ this.conf = conf;
+
// find the owner of the process
try {
mrOwner = UnixUserGroupInformation.login(conf);
@@ -1568,10 +1584,6 @@
AVERAGE_BLACKLIST_THRESHOLD =
conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
- // This is a directory of temporary submission files. We delete it
- // on startup, and can delete any files that we're done with
- this.conf = conf;
- JobConf jobConf = new JobConf(conf);
initializeTaskMemoryRelatedConfig();
@@ -1587,7 +1599,23 @@
= conf.getClass("mapred.jobtracker.taskScheduler",
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
+ taskScheduler.setTaskTrackerManager(this);
+ }
+ /**
+ * This contains the startup logic moved out of the constructor.
+ * It must never be called directly. Instead call {@link Service#start()} and
+ * let service decide whether to invoke this method once and once only.
+ *
+ * Although most of the intialization work has been performed, the
+ * JobTracker does not go live until {@link #offerService()} is called.
+ * accordingly, JobTracker does not enter the Live state here.
+ * @throws IOException for any startup problems
+ */
+ protected void innerStart() throws IOException {
+ // This is a directory of temporary submission files. We delete it
+ // on startup, and can delete any files that we're done with
+ JobConf jobConf = new JobConf(conf);
// Set ports, start RPC servers, setup security policy etc.
InetSocketAddress addr = getAddress(conf);
this.localMachine = addr.getHostName();
@@ -1640,6 +1668,9 @@
trackerIdentifier = getDateFormat().format(new Date());
// Initialize instrumentation
+ //this operation is synchronized to stop findbugs warning of inconsistent
+ //access
+ synchronized (this) {
JobTrackerInstrumentation tmp;
Class<? extends JobTrackerInstrumentation> metricsInst =
getInstrumentationClass(jobConf);
@@ -1654,6 +1685,7 @@
tmp = new JobTrackerMetricsInst(this, jobConf);
}
myInstrumentation = tmp;
+ }
// The rpc/web-server ports can be ephemeral ports...
// ... ensure we have the correct info
@@ -1673,6 +1705,9 @@
// if we haven't contacted the namenode go ahead and do it
if (fs == null) {
fs = FileSystem.get(conf);
+ if(fs == null) {
+ throw new IllegalStateException("Unable to bind to the filesystem");
+ }
}
// clean up the system dir, which will only work if hdfs is out of
// safe mode
@@ -1714,9 +1749,15 @@
((RemoteException)ie).getClassName())) {
throw ie;
}
- LOG.info("problem cleaning system directory: " + systemDir, ie);
+ LOG.info("problem cleaning system directory: " + systemDir + ": " + ie,
+ ie);
+ }
+ try {
+ Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted during system directory cleanup ",
+ e);
}
- Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
}
// Prepare for recovery. This is done irrespective of the status of restart
@@ -1756,7 +1797,11 @@
NetworkTopology.DEFAULT_HOST_LEVEL);
//initializes the job status store
- completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+ //this operation is synchronized to stop findbugs warning of inconsistent
+ //access
+ synchronized(this) {
+ completedJobStatusStore = new CompletedJobStatusStore(conf, fs);
+ }
}
private static SimpleDateFormat getDateFormat() {
@@ -1848,9 +1893,16 @@
}
/**
- * Run forever
+ * Run forever.
+ * Change the system state to indicate that we are live
+ * @throws InterruptedException interrupted operations
+ * @throws IOException IO Problems
*/
public void offerService() throws InterruptedException, IOException {
+ if(!enterLiveState()) {
+ //catch re-entrancy by returning early
+ return;
+ };
taskScheduler.start();
// Start the recovery after starting the scheduler
@@ -1870,25 +1922,70 @@
this.retireJobsThread.start();
expireLaunchingTaskThread.start();
- if (completedJobStatusStore.isActive()) {
- completedJobsStoreThread = new Thread(completedJobStatusStore,
- "completedjobsStore-housekeeper");
- completedJobsStoreThread.start();
+ synchronized (this) {
+ //this is synchronized to stop findbugs warning
+ if (completedJobStatusStore.isActive()) {
+ completedJobsStoreThread = new Thread(completedJobStatusStore,
+ "completedjobsStore-housekeeper");
+ completedJobsStoreThread.start();
+ }
}
+ LOG.info("Starting interTrackerServer");
// start the inter-tracker server once the jt is ready
this.interTrackerServer.start();
- synchronized (this) {
- state = State.RUNNING;
- }
LOG.info("Starting RUNNING");
this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}
- void close() throws IOException {
+ /////////////////////////////////////////////////////
+ // Service Lifecycle
+ /////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param status a status that can be updated with problems
+ * @throws IOException for any problem
+ */
+ @Override
+ public void innerPing(ServiceStatus status) throws IOException {
+ if (infoServer == null || !infoServer.isAlive()) {
+ status.addThrowable(
+ new IOException("TaskTracker HttpServer is not running on port "
+ + infoPort));
+ }
+ if (interTrackerServer == null) {
+ status.addThrowable(
+ new IOException("InterTrackerServer is not running"));
+ }
+ }
+
+ /**
+ * This service shuts down by stopping the
+ * {@link JobEndNotifier} and then closing down the job
+ * tracker
+ *
+ * @throws IOException exceptions which will be logged
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ JobEndNotifier.stopNotifier();
+ closeJobTracker();
+ }
+
+ /**
+ * Close down all the Job tracker threads, and the
+ * task scheduler.
+ * This was package scoped, but has been made private so that
+ * it does not get used. Callers should call {@link #close()} to
+ * stop a JobTracker
+ * @throws IOException if problems occur
+ */
+ private void closeJobTracker() throws IOException {
if (this.infoServer != null) {
LOG.info("Stopping infoServer");
try {
@@ -1901,48 +1998,63 @@
LOG.info("Stopping interTrackerServer");
this.interTrackerServer.stop();
}
- if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
- LOG.info("Stopping expireTrackers");
- this.expireTrackersThread.interrupt();
- try {
- this.expireTrackersThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
- LOG.info("Stopping retirer");
- this.retireJobsThread.interrupt();
- try {
- this.retireJobsThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
+ retireThread("expireTrackersThread", expireTrackersThread);
+ retireThread("retirer", retireJobsThread);
if (taskScheduler != null) {
taskScheduler.terminate();
}
- if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
- LOG.info("Stopping expireLaunchingTasks");
- this.expireLaunchingTaskThread.interrupt();
+ retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
+ retireThread("completedJobsStore thread", completedJobsStoreThread);
+ LOG.info("stopped all jobtracker services");
+ }
+
+ /**
+ * Close the filesystem without raising an exception. At the end of this
+ * method, fs==null.
+ * Warning: closing the FS may make it unusable for other clients in the same JVM.
+ */
+ protected synchronized void closeTheFilesystemQuietly() {
+ if (fs != null) {
try {
- this.expireLaunchingTaskThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
+ fs.close();
+ } catch (IOException e) {
+ LOG.warn("When closing the filesystem: " + e, e);
}
+ fs = null;
}
- if (this.completedJobsStoreThread != null &&
- this.completedJobsStoreThread.isAlive()) {
- LOG.info("Stopping completedJobsStore thread");
- this.completedJobsStoreThread.interrupt();
+ }
+
+ /**
+ * Retire a named thread if it is not null and still alive. The thread will be
+ * interruped and then joined.
+ *
+ * @param name thread name for log messages
+ * @param thread thread -can be null.
+ * @return true if the thread was shut down; false implies this thread was
+ * interrupted.
+ */
+ protected boolean retireThread(String name, Thread thread) {
+ if (thread != null && thread.isAlive()) {
+ LOG.info("Stopping " + name);
+ thread.interrupt();
try {
- this.completedJobsStoreThread.join();
+ thread.join();
} catch (InterruptedException ex) {
- ex.printStackTrace();
+ LOG.info("interruped during " + name + " shutdown", ex);
+ return false;
}
}
- LOG.info("stopped all jobtracker services");
- return;
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the name of this service
+ */
+ @Override
+ public String getServiceName() {
+ return "JobTracker";
}
///////////////////////////////////////////////////////
@@ -2476,7 +2588,7 @@
return numTaskCacheLevels;
}
public int getNumResolvedTaskTrackers() {
- return numResolved;
+ return taskTrackers.size();
}
public int getNumberOfUniqueHosts() {
@@ -3012,6 +3124,7 @@
* Allocates a new JobId string.
*/
public synchronized JobID getNewJobId() throws IOException {
+ verifyServiceState(ServiceState.LIVE);
return new JobID(getTrackerIdentifier(), nextJobId++);
}
@@ -3024,6 +3137,7 @@
* the JobTracker alone.
*/
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+ verifyServiceState(ServiceState.LIVE);
if(jobs.containsKey(jobId)) {
//job already running, don't start twice
return jobs.get(jobId).getStatus();
@@ -3118,6 +3232,10 @@
public synchronized ClusterStatus getClusterStatus(boolean detailed) {
synchronized (taskTrackers) {
+ //backport the service state into the job tracker state
+ State state = getServiceState() == ServiceState.LIVE ?
+ State.RUNNING :
+ State.INITIALIZING;
if (detailed) {
List<List<String>> trackerNames = taskTrackerNames();
return new ClusterStatus(trackerNames.get(0),
@@ -3445,6 +3563,10 @@
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
*/
public String getSystemDir() {
+ if (fs == null) {
+ throw new java.lang.IllegalStateException("Filesystem is null; "
+ + "JobTracker is not live: " + this);
+ }
Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
return fs.makeQualified(sysDir).toString();
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed May 27 11:08:21 2009
@@ -83,6 +83,8 @@
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.Service;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -94,11 +96,11 @@
* for Task assignments and reporting results.
*
*******************************************************/
-public class TaskTracker
+public class TaskTracker extends Service
implements MRConstants, TaskUmbilicalProtocol, Runnable {
static final long WAIT_FOR_DONE = 3 * 1000;
- private int httpPort;
+ int httpPort;
static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
@@ -120,7 +122,10 @@
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
- volatile boolean running = true;
+ /**
+ * Flag used to synchronize running state across threads.
+ */
+ private volatile boolean running = false;
private LocalDirAllocator localDirAllocator;
String taskTrackerName;
@@ -153,7 +158,7 @@
// The filesystem where job files are stored
FileSystem systemFS = null;
- private final HttpServer server;
+ private HttpServer server;
volatile boolean shuttingDown = false;
@@ -304,33 +309,7 @@
/**
* A daemon-thread that pulls tips off the list of things to cleanup.
*/
- private Thread taskCleanupThread =
- new Thread(new Runnable() {
- public void run() {
- while (true) {
- try {
- TaskTrackerAction action = tasksToCleanup.take();
- if (action instanceof KillJobAction) {
- purgeJob((KillJobAction) action);
- } else if (action instanceof KillTaskAction) {
- TaskInProgress tip;
- KillTaskAction killAction = (KillTaskAction) action;
- synchronized (TaskTracker.this) {
- tip = tasks.get(killAction.getTaskID());
- }
- LOG.info("Received KillTaskAction for task: " +
- killAction.getTaskID());
- purgeTask(tip, false);
- } else {
- LOG.error("Non-delete action given to cleanup thread: "
- + action);
- }
- } catch (Throwable except) {
- LOG.warn(StringUtils.stringifyException(except));
- }
- }
- }
- }, "taskCleanup");
+ private TaskCleanupThread taskCleanupThread;
TaskController getTaskController() {
return taskController;
@@ -424,6 +403,17 @@
* close().
*/
synchronized void initialize() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing Task Tracker: " + this);
+ }
+ //check that the server is not already live.
+
+ //allow this operation in only two service states: started and live
+ verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
+
+ //flip the running switch for our inner threads
+ running = true;
+
localFs = FileSystem.getLocal(fConf);
// use configured nameserver & interface to get local hostname
if (fConf.get("slave.host.name") != null) {
@@ -507,10 +497,17 @@
DistributedCache.purgeCache(this.fConf);
cleanupStorage();
+ //mark as just started; this is used in heartbeats
+ this.justStarted = true;
+ int connectTimeout = fConf
+ .getInt("mapred.task.tracker.connect.timeout", 60000);
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
- jobTrackAddr, this.fConf);
+ jobTrackAddr, this.fConf, connectTimeout);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connected to JobTracker at " + jobTrackAddr);
+ }
this.justInited = true;
this.running = true;
// start the thread that will fetch map task completion events
@@ -555,7 +552,9 @@
* startup, to remove any leftovers from previous run.
*/
public void cleanupStorage() throws IOException {
- this.fConf.deleteLocalFiles();
+ if (fConf != null) {
+ fConf.deleteLocalFiles();
+ }
}
// Object on wait which MapEventsFetcherThread is going to wait.
@@ -835,25 +834,73 @@
}
}
+ /////////////////////////////////////////////////////
+ // Service Lifecycle
+ /////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param status a status that can be updated with problems
+ * @throws IOException for any problem
+ */
+ @Override
+ public void innerPing(ServiceStatus status) throws IOException {
+ if (server == null || !server.isAlive()) {
+ status.addThrowable(
+ new IOException("TaskTracker HttpServer is not running on port "
+ + httpPort));
+ }
+ if (taskReportServer == null) {
+ status.addThrowable(
+ new IOException("TaskTracker Report Server is not running on "
+ + taskReportAddress));
+ }
+ }
+
+ /**
+ * A shutdown request triggers termination
+ * @throws IOException when errors happen during termination
+ */
public synchronized void shutdown() throws IOException {
- shuttingDown = true;
close();
- if (this.server != null) {
- try {
- LOG.info("Shutting down StatusHttpServer");
- this.server.stop();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IOException exceptions which will be logged
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ synchronized (this) {
+ shuttingDown = true;
+ closeTaskTracker();
+ if (this.server != null) {
+ try {
+ LOG.info("Shutting down StatusHttpServer");
+ this.server.stop();
} catch (Exception e) {
LOG.warn("Exception shutting down TaskTracker", e);
+ }
}
+ stopCleanupThreads();
}
}
+
/**
* Close down the TaskTracker and all its components. We must also shutdown
* any running tasks or threads, and cleanup disk space. A new TaskTracker
* within the same process space might be restarted, so everything must be
* clean.
+ * @throws IOException when errors happen during shutdown
*/
- public synchronized void close() throws IOException {
+ public synchronized void closeTaskTracker() throws IOException {
+ if (!running) {
+ //this operation is a no-op when not already running
+ return;
+ }
+ running = false;
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -865,27 +912,37 @@
tip.jobHasFinished(false);
}
- this.running = false;
-
// Clear local storage
cleanupStorage();
// Shutdown the fetcher thread
- this.mapEventsFetcher.interrupt();
+ if (mapEventsFetcher != null) {
+ mapEventsFetcher.interrupt();
+ }
//stop the launchers
- this.mapLauncher.interrupt();
- this.reduceLauncher.interrupt();
-
- jvmManager.stop();
+ if (mapLauncher != null) {
+ mapLauncher.cleanTaskQueue();
+ mapLauncher.interrupt();
+ }
+ if (reduceLauncher != null) {
+ reduceLauncher.cleanTaskQueue();
+ reduceLauncher.interrupt();
+ }
+ if (jvmManager != null) {
+ jvmManager.stop();
+ }
+
// shutdown RPC connections
RPC.stopProxy(jobClient);
// wait for the fetcher thread to exit
for (boolean done = false; !done; ) {
try {
- this.mapEventsFetcher.join();
+ if(mapEventsFetcher != null) {
+ mapEventsFetcher.join();
+ }
done = true;
} catch (InterruptedException e) {
}
@@ -898,10 +955,45 @@
}
/**
- * Start with the local machine name, and the default JobTracker
+ * Create and start a task tracker.
+ * Subclasses must not subclass this constructor, as it may
+ * call their initialisation/startup methods before the construction
+ * is complete
+ * It is here for backwards compatibility.
+ * @param conf configuration
+ * @throws IOException for problems on startup
*/
public TaskTracker(JobConf conf) throws IOException {
+ this(conf, true);
+ }
+
+ /**
+ * Subclasses should extend this constructor and pass start=false to the
+ * superclass to avoid race conditions in constructors and threads.
+ * @param conf configuration
+ * @param start flag to set to true to start the daemon
+ * @throws IOException for problems on startup
+ */
+ protected TaskTracker(JobConf conf, boolean start) throws IOException {
+ super(conf);
fConf = conf;
+ //for backwards compatibility, the task tracker starts up unless told not
+ //to. Subclasses should be very cautious about having their superclass
+ //do that as subclassed methods can be invoked before the class is fully
+ //configured
+ if (start) {
+ startService(this);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IOException for any problem.
+ */
+ @Override
+ protected synchronized void innerStart() throws IOException {
+ JobConf conf = fConf;
maxCurrentMapTasks = conf.getInt(
"mapred.tasktracker.map.tasks.maximum", 2);
maxCurrentReduceTasks = conf.getInt(
@@ -944,11 +1036,22 @@
}
private void startCleanupThreads() throws IOException {
+ taskCleanupThread = new TaskCleanupThread();
taskCleanupThread.setDaemon(true);
taskCleanupThread.start();
directoryCleanupThread = new CleanupQueue();
}
-
+
+ /**
+ * Tell the cleanup threads that they should end themselves
+ */
+ private void stopCleanupThreads() {
+ if (taskCleanupThread != null) {
+ taskCleanupThread.terminate();
+ taskCleanupThread = null;
+ }
+ }
+
/**
* The connection to the JobTracker, used by the TaskRunner
* for locating remote files.
@@ -996,6 +1099,7 @@
*/
State offerService() throws Exception {
long lastHeartbeat = 0;
+ boolean restartingService = true;
while (running && !shuttingDown) {
try {
@@ -1011,6 +1115,7 @@
// 1. Verify the buildVersion
// 2. Get the system directory & filesystem
if(justInited) {
+ LOG.debug("Checking build version with JobTracker");
String jobTrackerBV = jobClient.getBuildVersion();
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
@@ -1020,7 +1125,7 @@
try {
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
} catch(Exception e ) {
- LOG.info("Problem reporting to jobtracker: " + e);
+ LOG.info("Problem reporting to jobtracker: " + e, e);
}
return State.DENIED;
}
@@ -1031,6 +1136,9 @@
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(fConf);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("System directory is " + systemDirectory);
+ }
}
// Send the heartbeat and process the jobtracker's directives
@@ -1083,6 +1191,15 @@
return State.STALE;
}
+ //At this point the job tracker is present and compatible,
+ //so the service is coming up.
+ //It is time to declare it as such
+ if (restartingService) {
+ //declare the service as live.
+ enterLiveState();
+ restartingService = false;
+ }
+
// resetting heartbeat interval from the response.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
justStarted = false;
@@ -1726,6 +1843,8 @@
if (!shuttingDown) {
LOG.info("Lost connection to JobTracker [" +
jobTrackAddr + "]. Retrying...", ex);
+ //enter the started state; we are no longer live
+ enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
@@ -1734,7 +1853,7 @@
}
}
} finally {
- close();
+ closeTaskTracker();
}
if (shuttingDown) { return; }
LOG.warn("Reinitializing local state");
@@ -2704,7 +2823,17 @@
String getName() {
return taskTrackerName;
}
-
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the name of this service
+ */
+ @Override
+ public String getServiceName() {
+ return taskTrackerName != null ? taskTrackerName : "Task Tracker";
+ }
+
private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
boolean sendCounters) {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -2821,7 +2950,9 @@
// enable the server to track time spent waiting on locks
ReflectionUtils.setContentionTracing
(conf.getBoolean("tasktracker.contention.tracking", false));
- new TaskTracker(conf).run();
+ TaskTracker tracker = new TaskTracker(conf, false);
+ Service.startService(tracker);
+ tracker.run();
} catch (Throwable e) {
LOG.error("Can not start task tracker because "+
StringUtils.stringifyException(e));
@@ -3102,8 +3233,70 @@
try {
purgeTask(tip, wasFailure); // Marking it as failed/killed.
} catch (IOException ioe) {
- LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+ LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
+ }
+ }
+ }
+
+ /**
+ * Cleanup queue that can process actions to kill a job or task
+ */
+ private class TaskCleanupThread extends Daemon {
+
+ /**
+ * flag to halt work
+ */
+ private volatile boolean live = true;
+
+
+ /**
+ * Construct a daemon thread.
+ */
+ private TaskCleanupThread() {
+ setName("Task Tracker Task Cleanup Thread");
+ }
+
+ /**
+ * End the daemon. This is done by setting the live flag to false and
+ * interrupting ourselves.
+ */
+ public void terminate() {
+ live = false;
+ interrupt();
+ }
+
+ /**
+ * process task kill actions until told to stop being live.
+ */
+ public void run() {
+ LOG.debug("Task cleanup thread started");
+ while (live) {
+ try {
+ TaskTrackerAction action = tasksToCleanup.take();
+ if (action instanceof KillJobAction) {
+ purgeJob((KillJobAction) action);
+ } else if (action instanceof KillTaskAction) {
+ TaskInProgress tip;
+ KillTaskAction killAction = (KillTaskAction) action;
+ synchronized (TaskTracker.this) {
+ tip = tasks.get(killAction.getTaskID());
+ }
+ LOG.info("Received KillTaskAction for task: " +
+ killAction.getTaskID());
+ purgeTask(tip, false);
+ } else {
+ LOG.error("Non-delete action given to cleanup thread: "
+ + action);
+ }
+ } catch (InterruptedException except) {
+ //interrupted. this may have reset the live flag
+ } catch (Throwable except) {
+ LOG.warn("Exception in Cleanup thread: " + except,
+ except);
+ }
}
+ LOG.debug("Task cleanup thread ending");
}
}
+
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Wed May 27 11:08:21 2009
@@ -107,6 +107,15 @@
return actionType;
}
+ /**
+ * {@inheritDoc}
+ * @return the action type.
+ */
+ @Override
+ public String toString() {
+ return "TaskTrackerAction: " + actionType;
+ }
+
public void write(DataOutput out) throws IOException {
WritableUtils.writeEnum(out, actionType);
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed May 27 11:08:21 2009
@@ -309,6 +309,18 @@
}
/**
+ * String value prints the basic status of the task tracker
+ * @return a string value for diagnostics
+ */
+ @Override
+ public String toString() {
+ return trackerName
+ + " at http://" + host + ":" + httpPort + "/"
+ + " current task count: " + taskReports.size()
+ + " failed task count: " + failures;
+ }
+
+ /**
* Return the {@link ResourceStatus} object configured with this
* status.
*