You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2009/03/12 18:50:05 UTC
svn commit: r752949 [2/3] - in /hadoop/core:
branches/HADOOP-3628/src/test/org/apache/hadoop/cli/
branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/ trunk/conf/ trunk/ivy/
trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/
trunk/src/c...
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Mar 12 17:50:03 2009
@@ -395,9 +395,7 @@
*/
synchronized void processIOError(int index) {
if (editStreams == null || editStreams.size() <= 1) {
- FSNamesystem.LOG.fatal(
- "Fatal Error : All storage directories are inaccessible.");
- Runtime.getRuntime().exit(-1);
+ processAllStorageInaccessible();
}
assert(index < getNumStorageDirs());
assert(getNumStorageDirs() == editStreams.size());
@@ -417,27 +415,43 @@
//
fsimage.processIOError(parentStorageDir);
}
-
+
+ /**
+ * report inaccessible storage directories and trigger a fatal error
+ */
+ private void processAllStorageInaccessible() {
+ processFatalError("Fatal Error: All storage directories are inaccessible.");
+ }
+
+ /**
+ * Handle a fatal error
+ * @param message message to include in any output
+ */
+ protected void processFatalError(String message) {
+ FSNamesystem.LOG.fatal(message);
+ Runtime.getRuntime().exit(-1);
+ }
+
/**
* If there is an IO Error on any log operations on storage directory,
* remove any stream associated with that directory
*/
synchronized void processIOError(StorageDirectory sd) {
// Try to remove stream only if one should exist
- if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+ if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
return;
+ }
if (editStreams == null || editStreams.size() <= 1) {
- FSNamesystem.LOG.fatal(
- "Fatal Error : All storage directories are inaccessible.");
- Runtime.getRuntime().exit(-1);
+ processAllStorageInaccessible();
}
for (int idx = 0; idx < editStreams.size(); idx++) {
- File parentStorageDir = ((EditLogFileOutputStream)editStreams
- .get(idx)).getFile()
- .getParentFile().getParentFile();
- if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+ File parentStorageDir = ((EditLogFileOutputStream) editStreams
+ .get(idx)).getFile()
+ .getParentFile().getParentFile();
+ if (parentStorageDir.getName().equals(sd.getRoot().getName())) {
editStreams.remove(idx);
- }
+ }
+ }
}
/**
@@ -458,10 +472,8 @@
}
}
if (j == numEditStreams) {
- FSNamesystem.LOG.error("Unable to find sync log on which " +
- " IO error occured. " +
- "Fatal Error.");
- Runtime.getRuntime().exit(-1);
+ processFatalError("Fatal Error: Unable to find sync log on which " +
+ " IO error occured. ");
}
processIOError(j);
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Mar 12 17:50:03 2009
@@ -449,6 +449,59 @@
}
/**
+ * Test for a thread ref not being null or pointing to a dead thread
+ * @param thread the thread to check
+ * @return true if the thread is considered dead
+ */
+ private boolean isDead(Thread thread) {
+ return thread == null || !thread.isAlive();
+ }
+
+ /**
+ * Perform a cursory health check of the namesystem, particulary that it has
+ * not been closed and that all threads are running.
+ * @throws IOException for any health check
+ */
+ void ping() throws IOException {
+ if (!fsRunning) {
+ throw new IOException("Namesystem is not running");
+ }
+ boolean bad = false;
+ StringBuilder sb = new StringBuilder();
+ if (isDead(hbthread)) {
+ bad = true;
+ sb.append("[Heartbeat thread is dead]");
+ }
+ if (isDead(replthread)) {
+ bad = true;
+ sb.append("[Replication thread is dead]");
+ }
+ // this thread's liveness is only relevant in safe mode.
+ if (safeMode!=null && isDead(smmthread)) {
+ bad = true;
+ sb.append("[SafeModeMonitor thread is dead while the name system is in safe mode]");
+ }
+ if (isDead(dnthread)) {
+ bad = true;
+ sb.append("[DecommissionedMonitor thread is dead]");
+ }
+ if (isDead(lmthread)) {
+ bad = true;
+ sb.append("[Lease monitor thread is dead]");
+ }
+ if (pendingReplications == null || !pendingReplications.isAlive()) {
+ bad = true;
+ sb.append("[Pending replication thread is dead]");
+ }
+ if (this != getFSNamesystem()) {
+ bad = true;
+ sb.append("[FSNamesystem not a singleton]");
+ }
+ if (bad) {
+ throw new IOException(sb.toString());
+ }
+ }
+ /**
* Close down this file system manager.
* Causes heartbeat and lease daemons to stop; waits briefly for
* them to finish, but a short timeout returns control back to caller.
@@ -470,7 +523,10 @@
lmthread.interrupt();
lmthread.join(3000);
}
- dir.close();
+ if(dir != null) {
+ dir.close();
+ dir = null;
+ }
} catch (InterruptedException ie) {
} catch (IOException ie) {
LOG.error("Error closing FSDirectory", ie);
@@ -1252,9 +1308,13 @@
null,
blockSize);
if (targets.length < this.minReplication) {
- throw new IOException("File " + src + " could only be replicated to " +
- targets.length + " nodes, instead of " +
- minReplication);
+ String message = "File " + src + " could only be replicated to " +
+ targets.length + " nodes, instead of "
+ + minReplication
+ + ". ( there are " + heartbeats.size()
+ + " live data nodes in the cluster)";
+
+ throw new IOException(message);
}
// Allocate a new block and record it in the INode.
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Mar 12 17:50:03 2009
@@ -45,6 +45,7 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Service;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
@@ -84,7 +85,7 @@
* NameNode implements the ClientProtocol interface, which allows
* clients to ask for DFS services. ClientProtocol is not
* designed for direct use by authors of DFS client code. End-users
- * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
+ * should instead use the {@link FileSystem} class.
*
* NameNode also implements the DatanodeProtocol interface, used by
* DataNode programs that actually store DFS data blocks. These
@@ -95,7 +96,7 @@
* secondary namenodes or rebalancing processes to get partial namenode's
* state, for example partial blocksMap etc.
**********************************************************/
-public class NameNode implements ClientProtocol, DatanodeProtocol,
+public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
NamenodeProtocol, FSConstants,
RefreshAuthorizationPolicyProtocol {
static{
@@ -133,8 +134,6 @@
/** HTTP server address */
private InetSocketAddress httpAddress = null;
private Thread emptier;
- /** only used for testing purposes */
- private boolean stopRequested = false;
/** Is service level authorization enabled? */
private boolean serviceAuthEnabled = false;
@@ -162,7 +161,17 @@
}
public static InetSocketAddress getAddress(Configuration conf) {
- return getAddress(FileSystem.getDefaultUri(conf).getAuthority());
+ URI fsURI = FileSystem.getDefaultUri(conf);
+ if (fsURI == null) {
+ throw new IllegalArgumentException(
+ "No default filesystem URI in the configuration");
+ }
+ String auth = fsURI.getAuthority();
+ if (auth == null) {
+ throw new IllegalArgumentException(
+ "No authority for the Filesystem URI " + fsURI);
+ }
+ return getAddress(auth);
}
public static URI getUri(InetSocketAddress namenode) {
@@ -175,6 +184,7 @@
* Initialize name-node.
*
* @param conf the configuration
+ * @throws IOException for problems during initialization
*/
private void initialize(Configuration conf) throws IOException {
InetSocketAddress socAddr = NameNode.getAddress(conf);
@@ -261,7 +271,7 @@
}
/**
- * Start NameNode.
+ * Create a NameNode.
* <p>
* The name-node can be started with one of the following startup options:
* <ul>
@@ -280,14 +290,49 @@
* <code>zero</code> in the conf.
*
* @param conf confirguration
- * @throws IOException
+ * @throws IOException for backwards compatibility
*/
public NameNode(Configuration conf) throws IOException {
- try {
- initialize(conf);
- } catch (IOException e) {
- this.stop();
- throw e;
+ super(conf);
+ }
+
+ /////////////////////////////////////////////////////
+ // Service Lifecycle
+ /////////////////////////////////////////////////////
+
+ /**
+ * This method does all the startup.
+ * It is invoked from {@link #start()} when needed.
+ *
+ * @throws IOException for any problem.
+ */
+ @Override
+ protected void innerStart() throws IOException {
+ initialize(getConf());
+ setServiceState(ServiceState.LIVE);
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * This implementation checks for the name system being non-null and live
+ * @throws IOException for any ping failure
+ * @throws LivenessException if the name system is not running @param status
+ */
+ @Override
+ public void innerPing(ServiceStatus status) throws IOException {
+ if (namesystem == null) {
+ status.addThrowable(new LivenessException("No name system"));
+ } else {
+ try {
+ namesystem.ping();
+ } catch (IOException e) {
+ status.addThrowable(e);
+ }
+ }
+ if (httpServer == null || !httpServer.isAlive()) {
+ status.addThrowable(
+ new IOException("NameNode HttpServer is not running"));
}
}
@@ -297,34 +342,81 @@
*/
public void join() {
try {
- this.server.join();
+ if (server != null) {
+ server.join();
+ }
} catch (InterruptedException ie) {
}
}
/**
- * Stop all NameNode threads and wait for all to finish.
+ * {@inheritDoc}
+ * To shut down, this service stops all NameNode threads and waits for them
+ * to finish. It also stops the metrics.
*/
- public void stop() {
- if (stopRequested)
- return;
- stopRequested = true;
+ @Override
+ public synchronized void innerClose() throws IOException {
+ LOG.info("Closing NameNode");
try {
- if (httpServer != null) httpServer.stop();
+ if (httpServer != null) {
+ httpServer.stop();
+ }
} catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
+ LOG.error(StringUtils.stringifyException(e),e);
+ }
+ httpServer = null;
+ if (namesystem != null) {
+ namesystem.close();
+ }
+ if (emptier != null) {
+ emptier.interrupt();
+ emptier = null;
+ }
+ if (server != null) {
+ server.stop();
+ server = null;
}
- if(namesystem != null) namesystem.close();
- if(emptier != null) emptier.interrupt();
- if(server != null) server.stop();
if (myMetrics != null) {
myMetrics.shutdown();
}
if (namesystem != null) {
namesystem.shutdown();
+ namesystem = null;
}
}
+ /**
+ * Retained for backwards compatibility.
+ */
+ public void stop() {
+ closeQuietly();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the name of this service
+ */
+ @Override
+ public String getServiceName() {
+ return "NameNode";
+ }
+
+ /**
+ * The toString operator returns the super class name/id, and the state. This
+ * gives all services a slightly useful message in a debugger or test report
+ *
+ * @return a string representation of the object.
+ */
+ @Override
+ public String toString() {
+ return getServiceName() + " instance " + super.toString() + " in state "
+ + getServiceState()
+ + (httpAddress != null ? (" at " + httpAddress + " , "): "")
+ + (server != null ? (", IPC " + server.getListenerAddress()) : "");
+ }
+
+
/////////////////////////////////////////////////////
// NamenodeProtocol
/////////////////////////////////////////////////////
@@ -969,6 +1061,7 @@
}
NameNode namenode = new NameNode(conf);
+ deploy(namenode);
return namenode;
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java Thu Mar 12 17:50:03 2009
@@ -136,6 +136,15 @@
}
/**
+ * Test for the replicator being alive.
+ * @return true if the thread is running.
+ */
+ boolean isAlive() {
+ Daemon daemon = timerThread;
+ return daemon != null && daemon.isAlive();
+ }
+
+ /**
* An object that contains information about a block that
* is being replicated. It records the timestamp when the
* system started replicating the most recent copy of this
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Mar 12 17:50:03 2009
@@ -1243,11 +1243,26 @@
/**
* Utility that submits a job, then polls for progress until the job is
* complete.
- *
+ *
* @param job the job configuration.
- * @throws IOException
+ * @return the job reference of the completed, successful, job
+ * @throws IOException any IO problem, and job failure
*/
public static RunningJob runJob(JobConf job) throws IOException {
+ return runJob(job, -1);
+ }
+
+ /**
+ * Utility that submits a job, then polls for progress until the job is
+ * complete.
+ *
+ * @param job the job configuration.
+ * @param timeout timeout in milliseconds; any value less than or equal to
+ * zero means "do not time out"
+ * @return the job reference of the completed, successful, job
+ * @throws IOException any IO problem, and job failure
+ */
+ public static RunningJob runJob(JobConf job, long timeout) throws IOException {
JobClient jc = new JobClient(job);
boolean error = true;
RunningJob running = null;
@@ -1255,6 +1270,7 @@
final int MAX_RETRIES = 5;
int retries = MAX_RETRIES;
TaskStatusFilter filter;
+ long endTime = timeout > 0 ? System.currentTimeMillis() + timeout : 0;
try {
filter = getTaskOutputFilter(job);
} catch(IllegalArgumentException e) {
@@ -1351,6 +1367,10 @@
LOG.info("Communication problem with server: " +
StringUtils.stringifyException(ie));
}
+ //check for timeout
+ if (endTime > 0 && endTime > System.currentTimeMillis()) {
+ throw new IOException("Job execution timed out");
+ }
}
if (!running.isSuccessful()) {
throw new IOException("Job failed!");
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Thu Mar 12 17:50:03 2009
@@ -89,7 +89,11 @@
public static void stopNotifier() {
running = false;
- thread.interrupt();
+ //copy into a variable to deal with race conditions
+ Thread notifier = thread;
+ if (notifier != null) {
+ notifier.interrupt();
+ }
}
private static JobEndStatusInfo createNotification(JobConf conf,
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Mar 12 17:50:03 2009
@@ -77,13 +77,14 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.Service;
/*******************************************************
* JobTracker is the central location for submitting and
* tracking MR jobs in a network environment.
*
*******************************************************/
-public class JobTracker implements MRConstants, InterTrackerProtocol,
+public class JobTracker extends Service implements MRConstants, InterTrackerProtocol,
JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
static{
@@ -110,7 +111,11 @@
public static enum State { INITIALIZING, RUNNING }
State state = State.INITIALIZING;
private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
-
+ /**
+ * Time in milliseconds to sleep while trying to start the job tracker:
+ * {@value}
+ */
+ private static final int STARTUP_SLEEP_INTERVAL = 1000;
private DNSToSwitchMapping dnsToSwitchMapping;
private NetworkTopology clusterMap = new NetworkTopology();
private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -166,7 +171,7 @@
while (true) {
try {
result = new JobTracker(conf);
- result.taskScheduler.setTaskTrackerManager(result);
+ deploy(result);
break;
} catch (VersionMismatch e) {
throw e;
@@ -175,19 +180,24 @@
} catch (UnknownHostException e) {
throw e;
} catch (IOException e) {
- LOG.warn("Error starting tracker: " +
- StringUtils.stringifyException(e));
+ LOG.warn("Error starting tracker: " +
+ e.getMessage(), e);
}
- Thread.sleep(1000);
+ Thread.sleep(STARTUP_SLEEP_INTERVAL);
}
- if (result != null) {
+ if (result != null && result.isRunning()) {
JobEndNotifier.startNotifier();
}
return result;
}
- public void stopTracker() throws IOException {
- JobEndNotifier.stopNotifier();
+ /**
+ * This stops the tracker, the JobEndNotifier and moves the service into the
+ * terminated state.
+ *
+ * @throws IOException for any trouble during closedown
+ */
+ public synchronized void stopTracker() throws IOException {
close();
}
@@ -1299,7 +1309,7 @@
// (hostname --> Node (NetworkTopology))
Map<String, Node> hostnameToNodeMap =
Collections.synchronizedMap(new TreeMap<String, Node>());
-
+
// Number of resolved entries
int numResolved;
@@ -1351,7 +1361,7 @@
);
// Used to provide an HTML view on Job, Task, and TaskTracker structures
- final HttpServer infoServer;
+ HttpServer infoServer;
int infoPort;
Server interTrackerServer;
@@ -1366,9 +1376,13 @@
private QueueManager queueManager;
/**
- * Start the JobTracker process, listen on the indicated port
+ * Create the JobTracker, based on the configuration
+ * @param conf configuration to use
+ * @throws IOException on problems initializing the tracker
*/
JobTracker(JobConf conf) throws IOException, InterruptedException {
+ super(conf);
+ this.conf = conf;
//
// Grab some static constants
//
@@ -1386,10 +1400,6 @@
AVERAGE_BLACKLIST_THRESHOLD =
conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
- // This is a directory of temporary submission files. We delete it
- // on startup, and can delete any files that we're done with
- this.conf = conf;
- JobConf jobConf = new JobConf(conf);
// Read the hosts/exclude files to restrict access to the jobtracker.
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
@@ -1402,7 +1412,23 @@
= conf.getClass("mapred.jobtracker.taskScheduler",
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
+ taskScheduler.setTaskTrackerManager(this);
+ }
+ /**
+ * This contains the startup logic moved out of the constructor.
+ * It must never be called directly. Instead call {@link Service#start()} and
+ * let service decide whether to invoke this method once and once only.
+ *
+ * Although most of the intialization work has been performed, the
+ * JobTracker does not go live until {@link #offerService()} is called.
+ * accordingly, JobTracker does not enter the Live state here.
+ * @throws IOException for any startup problems
+ */
+ protected void innerStart() throws IOException {
+ // This is a directory of temporary submission files. We delete it
+ // on startup, and can delete any files that we're done with
+ JobConf jobConf = new JobConf(conf);
// Set ports, start RPC servers, setup security policy etc.
InetSocketAddress addr = getAddress(conf);
this.localMachine = addr.getHostName();
@@ -1458,15 +1484,19 @@
trackerIdentifier = getDateFormat().format(new Date());
Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
- try {
- java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
- metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
- this.myInstrumentation = c.newInstance(this, jobConf);
- } catch(Exception e) {
- //Reflection can throw lots of exceptions -- handle them all by
- //falling back on the default.
- LOG.error("failed to initialize job tracker metrics", e);
- this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+ //this operation is synchronized to stop findbugs warning of inconsistent
+ //access
+ synchronized (this) {
+ try {
+ java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+ this.myInstrumentation = c.newInstance(this, jobConf);
+ } catch(Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize job tracker metrics", e);
+ this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+ }
}
@@ -1488,6 +1518,9 @@
// if we haven't contacted the namenode go ahead and do it
if (fs == null) {
fs = FileSystem.get(conf);
+ if(fs == null) {
+ throw new IllegalStateException("Unable to bind to the filesystem");
+ }
}
// clean up the system dir, which will only work if hdfs is out of
// safe mode
@@ -1529,9 +1562,14 @@
((RemoteException)ie).getClassName())) {
throw ie;
}
- LOG.info("problem cleaning system directory: " + systemDir, ie);
+ LOG.info("problem cleaning system directory: " + systemDir + ": " + ie, ie);
+ }
+ try {
+ Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted during system directory cleanup ",
+ e);
}
- Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
}
// Same with 'localDir' except it's always on the local disk.
jobConf.deleteLocalFiles(SUBDIR);
@@ -1553,7 +1591,11 @@
NetworkTopology.DEFAULT_HOST_LEVEL);
//initializes the job status store
- completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+ //this operation is synchronized to stop findbugs warning of inconsistent
+ //access
+ synchronized(this) {
+ completedJobStatusStore = new CompletedJobStatusStore(conf, fs);
+ }
}
private static SimpleDateFormat getDateFormat() {
@@ -1618,9 +1660,16 @@
}
/**
- * Run forever
+ * Run forever.
+ * Change the system state to indicate that we are live
+ * @throws InterruptedException interrupted operations
+ * @throws IOException IO Problems
*/
public void offerService() throws InterruptedException, IOException {
+ if(!enterLiveState()) {
+ //catch re-entrancy by returning early
+ return;
+ };
taskScheduler.start();
// Start the recovery after starting the scheduler
@@ -1637,79 +1686,139 @@
this.retireJobsThread.start();
expireLaunchingTaskThread.start();
- if (completedJobStatusStore.isActive()) {
- completedJobsStoreThread = new Thread(completedJobStatusStore,
- "completedjobsStore-housekeeper");
- completedJobsStoreThread.start();
+ synchronized (this) {
+ //this is synchronized to stop findbugs warning
+ if (completedJobStatusStore.isActive()) {
+ completedJobsStoreThread = new Thread(completedJobStatusStore,
+ "completedjobsStore-housekeeper");
+ completedJobsStoreThread.start();
+ }
}
+ LOG.info("Starting interTrackerServer");
// start the inter-tracker server once the jt is ready
this.interTrackerServer.start();
- synchronized (this) {
- state = State.RUNNING;
- }
- LOG.info("Starting RUNNING");
+ LOG.info("Starting RUNNING");
this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}
- void close() throws IOException {
- if (this.infoServer != null) {
+ /////////////////////////////////////////////////////
+ // Service Lifecycle
+ /////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param status a status that can be updated with problems
+ * @throws IOException for any problem
+ */
+ @Override
+ public void innerPing(ServiceStatus status) throws IOException {
+ if (infoServer == null || !infoServer.isAlive()) {
+ status.addThrowable(
+ new IOException("TaskTracker HttpServer is not running on port "
+ + infoPort));
+ }
+ if (interTrackerServer == null) {
+ status.addThrowable(
+ new IOException("InterTrackerServer is not running"));
+ }
+ }
+
+ /**
+ * This service shuts down by stopping the
+ * {@link JobEndNotifier} and then closing down the job
+ * tracker
+ *
+ * @throws IOException exceptions which will be logged
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ JobEndNotifier.stopNotifier();
+ closeJobTracker();
+ }
+
+ /**
+ * Close down all the Job tracker threads, and the
+ * task scheduler.
+ * This was package scoped, but has been made private so that
+ * it does not get used. Callers should call {@link #close()} to
+ * stop a JobTracker
+ * @throws IOException if problems occur
+ */
+ private void closeJobTracker() throws IOException {
+ if (infoServer != null) {
LOG.info("Stopping infoServer");
try {
- this.infoServer.stop();
+ infoServer.stop();
} catch (Exception ex) {
LOG.warn("Exception shutting down JobTracker", ex);
}
}
- if (this.interTrackerServer != null) {
+ if (interTrackerServer != null) {
LOG.info("Stopping interTrackerServer");
- this.interTrackerServer.stop();
- }
- if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
- LOG.info("Stopping expireTrackers");
- this.expireTrackersThread.interrupt();
- try {
- this.expireTrackersThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
- LOG.info("Stopping retirer");
- this.retireJobsThread.interrupt();
- try {
- this.retireJobsThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
+ interTrackerServer.stop();
}
+ retireThread("expireTrackersThread", expireTrackersThread);
+ retireThread("retirer", retireJobsThread);
if (taskScheduler != null) {
taskScheduler.terminate();
}
- if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
- LOG.info("Stopping expireLaunchingTasks");
- this.expireLaunchingTaskThread.interrupt();
+ retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
+ retireThread("completedJobsStore thread", completedJobsStoreThread);
+ LOG.info("stopped all jobtracker services");
+ }
+
+ /**
+ * Close the filesystem without raising an exception. At the end of this
+ * method, fs==null.
+ * Warning: closing the FS may make it unusable for other clients in the same JVM.
+ */
+ protected synchronized void closeTheFilesystemQuietly() {
+ if (fs != null) {
try {
- this.expireLaunchingTaskThread.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
+ fs.close();
+ } catch (IOException e) {
+ LOG.warn("When closing the filesystem: " + e, e);
}
+ fs = null;
}
- if (this.completedJobsStoreThread != null &&
- this.completedJobsStoreThread.isAlive()) {
- LOG.info("Stopping completedJobsStore thread");
- this.completedJobsStoreThread.interrupt();
+ }
+
+ /**
+ * Retire a named thread if it is not null and still alive. The thread will be
+ * interruped and then joined.
+ *
+ * @param name thread name for log messages
+ * @param thread thread -can be null.
+ * @return true if the thread was shut down; false implies this thread was
+ * interrupted.
+ */
+ protected boolean retireThread(String name, Thread thread) {
+ if (thread != null && thread.isAlive()) {
+ LOG.info("Stopping " + name);
+ thread.interrupt();
try {
- this.completedJobsStoreThread.join();
+ thread.join();
} catch (InterruptedException ex) {
- ex.printStackTrace();
+ LOG.info("interruped during " + name + " shutdown", ex);
+ return false;
}
}
- LOG.info("stopped all jobtracker services");
- return;
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the name of this service
+ */
+ @Override
+ public String getServiceName() {
+ return "JobTracker";
}
///////////////////////////////////////////////////////
@@ -1847,7 +1956,7 @@
}
/**
- * Call {@link #removeTaskEntry(String)} for each of the
+ * Call {@link #removeTaskEntry(TaskAttemptID)} for each of the
* job's tasks.
* When the JobTracker is retiring the long-completed
* job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
@@ -2230,7 +2339,7 @@
return numTaskCacheLevels;
}
public int getNumResolvedTaskTrackers() {
- return numResolved;
+ return taskTrackers.size();
}
public int getNumberOfUniqueHosts() {
@@ -2757,6 +2866,7 @@
* Allocates a new JobId string.
*/
public synchronized JobID getNewJobId() throws IOException {
+ verifyServiceState(ServiceState.LIVE);
return new JobID(getTrackerIdentifier(), nextJobId++);
}
@@ -2769,6 +2879,7 @@
* the JobTracker alone.
*/
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+ verifyServiceState(ServiceState.LIVE);
if(jobs.containsKey(jobId)) {
//job already running, don't start twice
return jobs.get(jobId).getStatus();
@@ -2851,6 +2962,10 @@
public synchronized ClusterStatus getClusterStatus(boolean detailed) {
synchronized (taskTrackers) {
+ //backport the service state into the job tracker state
+ State state = getServiceState() == ServiceState.LIVE ?
+ State.RUNNING :
+ State.INITIALIZING;
if (detailed) {
List<List<String>> trackerNames = taskTrackerNames();
return new ClusterStatus(trackerNames.get(0),
@@ -3167,6 +3282,10 @@
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
*/
public String getSystemDir() {
+ if (fs == null) {
+ throw new java.lang.IllegalStateException("Filesystem is null; "
+ + "JobTracker is not live: " + this);
+ }
Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
return fs.makeQualified(sysDir).toString();
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 12 17:50:03 2009
@@ -81,11 +81,12 @@
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.Service;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -94,13 +95,19 @@
* in a networked environment. It contacts the JobTracker
* for Task assignments and reporting results.
*
+ *
+ * The TaskTracker has a complex lifecycle in that it
+ * can be "recycled"; after {@link #closeTaskTracker()} is called,
+ * it can be reset using {@link #initialize()}. This is
+ * within the {@link Service} lifecycle.
*******************************************************/
-public class TaskTracker
+public class TaskTracker extends Service
implements MRConstants, TaskUmbilicalProtocol, Runnable {
- static final long WAIT_FOR_DONE = 3 * 1000;
- private int httpPort;
+ /** time to wait for a finished task to be reported as done: {@value}*/
+ private static final long WAIT_FOR_DONE = 3 * 1000;
+ int httpPort;
- static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+ enum State {NORMAL, STALE, INTERRUPTED, DENIED}
static{
Configuration.addDefaultResource("mapred-default.xml");
@@ -119,7 +126,10 @@
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
- volatile boolean running = true;
+ /**
+ * Flag used to synchronize running state across threads.
+ */
+ private volatile boolean running = false;
private LocalDirAllocator localDirAllocator;
String taskTrackerName;
@@ -134,7 +144,7 @@
// last heartbeat response recieved
short heartbeatResponseId = -1;
- /*
+ /**
* This is the last 'status' report sent by this tracker to the JobTracker.
*
* If the rpc call succeeds, this 'status' is cleared-out by this tracker;
@@ -144,14 +154,15 @@
*/
TaskTrackerStatus status = null;
- // The system-directory on HDFS where job files are stored
+ /** The system-directory on HDFS where job files are stored */
Path systemDirectory = null;
- // The filesystem where job files are stored
+ /** The filesystem where job files are stored */
FileSystem systemFS = null;
- private final HttpServer server;
+ private HttpServer server;
+ /** Flag used to synchronize startup across threads. */
volatile boolean shuttingDown = false;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -344,33 +355,7 @@
/**
* A daemon-thread that pulls tips off the list of things to cleanup.
*/
- private Thread taskCleanupThread =
- new Thread(new Runnable() {
- public void run() {
- while (true) {
- try {
- TaskTrackerAction action = tasksToCleanup.take();
- if (action instanceof KillJobAction) {
- purgeJob((KillJobAction) action);
- } else if (action instanceof KillTaskAction) {
- TaskInProgress tip;
- KillTaskAction killAction = (KillTaskAction) action;
- synchronized (TaskTracker.this) {
- tip = tasks.get(killAction.getTaskID());
- }
- LOG.info("Received KillTaskAction for task: " +
- killAction.getTaskID());
- purgeTask(tip, false);
- } else {
- LOG.error("Non-delete action given to cleanup thread: "
- + action);
- }
- } catch (Throwable except) {
- LOG.warn(StringUtils.stringifyException(except));
- }
- }
- }
- }, "taskCleanup");
+ private TaskCleanupThread taskCleanupThread;
private RunningJob addTaskToJob(JobID jobId,
TaskInProgress tip) {
@@ -481,11 +466,24 @@
}
/**
- * Do the real constructor work here. It's in a separate method
+ * Initialize the connection.
+ * This method will block until a job tracker is found
+ * It's in a separate method
* so we can call it again and "recycle" the object after calling
* close().
*/
synchronized void initialize() throws IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Initializing Task Tracker: " + this);
+ }
+ //check that the server is not already live.
+
+ //allow this operation in only two service states: started and live
+ verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
+
+ //flip the running switch for our inner threads
+ running = true;
+
// use configured nameserver & interface to get local hostname
if (fConf.get("slave.host.name") != null) {
this.localHostname = fConf.get("slave.host.name");
@@ -572,10 +570,17 @@
DistributedCache.purgeCache(this.fConf);
cleanupStorage();
+ //mark as just started; this is used in heartbeats
+ this.justStarted = true;
+ int connectTimeout = fConf
+ .getInt("mapred.task.tracker.connect.timeout", 60000);
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
- jobTrackAddr, this.fConf);
+ jobTrackAddr, fConf, connectTimeout);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connected to JobTracker at " + jobTrackAddr);
+ }
this.justInited = true;
this.running = true;
// start the thread that will fetch map task completion events
@@ -611,7 +616,9 @@
* startup, to remove any leftovers from previous run.
*/
public void cleanupStorage() throws IOException {
- this.fConf.deleteLocalFiles();
+ if (fConf != null) {
+ fConf.deleteLocalFiles();
+ }
}
// Object on wait which MapEventsFetcherThread is going to wait.
@@ -890,25 +897,117 @@
}
}
- public synchronized void shutdown() throws IOException {
- shuttingDown = true;
- close();
- if (this.server != null) {
- try {
- LOG.info("Shutting down StatusHttpServer");
- this.server.stop();
+ /////////////////////////////////////////////////////
+ // Service Lifecycle
+ /////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IOException for any problem.
+ */
+ @Override
+ protected synchronized void innerStart() throws IOException {
+ JobConf conf = fConf;
+ maxCurrentMapTasks = conf.getInt(
+ "mapred.tasktracker.map.tasks.maximum", 2);
+ maxCurrentReduceTasks = conf.getInt(
+ "mapred.tasktracker.reduce.tasks.maximum", 2);
+ this.jobTrackAddr = JobTracker.getAddress(conf);
+ String infoAddr =
+ NetUtils.getServerAddress(conf,
+ "tasktracker.http.bindAddress",
+ "tasktracker.http.port",
+ "mapred.task.tracker.http.address");
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+ String httpBindAddress = infoSocAddr.getHostName();
+ int port = infoSocAddr.getPort();
+ this.server = new HttpServer("task", httpBindAddress, port,
+ port == 0, conf);
+ workerThreads = conf.getInt("tasktracker.http.threads", 40);
+ this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
+ server.setThreads(1, workerThreads);
+ // let the jsp pages get to the task tracker, config, and other relevant
+ // objects
+ FileSystem local = FileSystem.getLocal(conf);
+ this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+ server.setAttribute("task.tracker", this);
+ server.setAttribute("local.file.system", local);
+ server.setAttribute("conf", conf);
+ server.setAttribute("log", LOG);
+ server.setAttribute("localDirAllocator", localDirAllocator);
+ server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+ server.addInternalServlet("mapOutput", "/mapOutput",
+ MapOutputServlet.class);
+ server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+ server.start();
+ this.httpPort = server.getPort();
+ initialize();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param status a status that can be updated with problems
+ * @throws IOException for any problem
+ */
+ @Override
+ public void innerPing(ServiceStatus status) throws IOException {
+ if (server == null || !server.isAlive()) {
+ status.addThrowable(
+ new IOException("TaskTracker HttpServer is not running on port "
+ + httpPort));
+ }
+ if (taskReportServer == null) {
+ status.addThrowable(
+ new IOException("TaskTracker Report Server is not running on "
+ + taskReportAddress));
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws IOException exceptions which will be logged
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ synchronized (this) {
+ shuttingDown = true;
+ closeTaskTracker();
+ if (this.server != null) {
+ try {
+ LOG.info("Shutting down StatusHttpServer");
+ this.server.stop();
} catch (Exception e) {
LOG.warn("Exception shutting down TaskTracker", e);
+ }
}
+ stopCleanupThreads();
}
}
+
+ /**
+ * A shutdown request triggers termination
+ * @throws IOException when errors happen during termination
+ */
+ public synchronized void shutdown() throws IOException {
+ close();
+ }
+
/**
* Close down the TaskTracker and all its components. We must also shutdown
* any running tasks or threads, and cleanup disk space. A new TaskTracker
* within the same process space might be restarted, so everything must be
* clean.
+ * @throws IOException when errors happen during shutdown
*/
- public synchronized void close() throws IOException {
+ public synchronized void closeTaskTracker() throws IOException {
+ if (!running) {
+ //this operation is a no-op when not already running
+ return;
+ }
+ running = false;
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -920,27 +1019,37 @@
tip.jobHasFinished(false);
}
- this.running = false;
-
// Clear local storage
cleanupStorage();
// Shutdown the fetcher thread
- this.mapEventsFetcher.interrupt();
+ if (mapEventsFetcher != null) {
+ mapEventsFetcher.interrupt();
+ }
//stop the launchers
- this.mapLauncher.interrupt();
- this.reduceLauncher.interrupt();
-
- jvmManager.stop();
+ if (mapLauncher != null) {
+ mapLauncher.cleanTaskQueue();
+ mapLauncher.interrupt();
+ }
+ if (reduceLauncher != null) {
+ reduceLauncher.cleanTaskQueue();
+ reduceLauncher.interrupt();
+ }
+ if (jvmManager != null) {
+ jvmManager.stop();
+ }
+
// shutdown RPC connections
RPC.stopProxy(jobClient);
// wait for the fetcher thread to exit
for (boolean done = false; !done; ) {
try {
- this.mapEventsFetcher.join();
+ if(mapEventsFetcher != null) {
+ mapEventsFetcher.join();
+ }
done = true;
} catch (InterruptedException e) {
}
@@ -953,52 +1062,54 @@
}
/**
- * Start with the local machine name, and the default JobTracker
+ * Create and start a task tracker.
+ * Subclasses must not subclass this constructor, as it may
+ * call their initialisation/startup methods before the constructor
+ * is complete.
+ * It is here for backwards compatibility.
+ * @param conf configuration
+ * @throws IOException for problems on startup
*/
public TaskTracker(JobConf conf) throws IOException {
+ this(conf, true);
+ }
+
+ /**
+ * Subclasses should extend this constructor and pass start=false to the
+ * superclass to avoid race conditions in constructors and threads.
+ * @param conf configuration
+ * @param start flag to set to true to start the daemon
+ * @throws IOException for problems on startup
+ */
+ protected TaskTracker(JobConf conf, boolean start) throws IOException {
+ super(conf);
fConf = conf;
- maxCurrentMapTasks = conf.getInt(
- "mapred.tasktracker.map.tasks.maximum", 2);
- maxCurrentReduceTasks = conf.getInt(
- "mapred.tasktracker.reduce.tasks.maximum", 2);
- this.jobTrackAddr = JobTracker.getAddress(conf);
- String infoAddr =
- NetUtils.getServerAddress(conf,
- "tasktracker.http.bindAddress",
- "tasktracker.http.port",
- "mapred.task.tracker.http.address");
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
- String httpBindAddress = infoSocAddr.getHostName();
- int httpPort = infoSocAddr.getPort();
- this.server = new HttpServer("task", httpBindAddress, httpPort,
- httpPort == 0, conf);
- workerThreads = conf.getInt("tasktracker.http.threads", 40);
- this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
- server.setThreads(1, workerThreads);
- // let the jsp pages get to the task tracker, config, and other relevant
- // objects
- FileSystem local = FileSystem.getLocal(conf);
- this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
- server.setAttribute("task.tracker", this);
- server.setAttribute("local.file.system", local);
- server.setAttribute("conf", conf);
- server.setAttribute("log", LOG);
- server.setAttribute("localDirAllocator", localDirAllocator);
- server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
- server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
- server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
- server.start();
- this.httpPort = server.getPort();
- initialize();
+ //for backwards compatibility, the task tracker starts up unless told not
+ //to. Subclasses should be very cautious about having their superclass
+ //do that as subclassed methods can be invoked before the class is fully
+ //configured
+ if(start) {
+ deploy(this);
+ }
}
private void startCleanupThreads() throws IOException {
- taskCleanupThread.setDaemon(true);
+ taskCleanupThread = new TaskCleanupThread();
taskCleanupThread.start();
directoryCleanupThread = new CleanupQueue();
}
/**
+ * Tell the cleanup threads that they should end themselves
+ */
+ private void stopCleanupThreads() {
+ if (taskCleanupThread != null) {
+ taskCleanupThread.terminate();
+ taskCleanupThread = null;
+ }
+ }
+
+ /**
* The connection to the JobTracker, used by the TaskRunner
* for locating remote files.
*/
@@ -1045,6 +1156,7 @@
*/
State offerService() throws Exception {
long lastHeartbeat = 0;
+ boolean restartingService = true;
while (running && !shuttingDown) {
try {
@@ -1060,6 +1172,7 @@
// 1. Verify the buildVersion
// 2. Get the system directory & filesystem
if(justInited) {
+ LOG.debug("Checking build version with JobTracker");
String jobTrackerBV = jobClient.getBuildVersion();
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
@@ -1069,7 +1182,7 @@
try {
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
} catch(Exception e ) {
- LOG.info("Problem reporting to jobtracker: " + e);
+ LOG.info("Problem reporting to jobtracker: " + e, e);
}
return State.DENIED;
}
@@ -1080,6 +1193,9 @@
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(fConf);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("System directory is " + systemDirectory);
+ }
}
// Send the heartbeat and process the jobtracker's directives
@@ -1132,6 +1248,15 @@
return State.STALE;
}
+ //At this point the job tracker is present and compatible,
+ //so the service is coming up.
+ //It is time to declare it as such
+ if (restartingService) {
+ //declare the service as live.
+ enterLiveState();
+ restartingService = false;
+ }
+
// resetting heartbeat interval from the response.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
justStarted = false;
@@ -1157,15 +1282,16 @@
//we've cleaned up, resume normal operation
if (!acceptNewTasks && isIdle()) {
+ LOG.info("ready to accept new tasks again");
acceptNewTasks=true;
}
} catch (InterruptedException ie) {
LOG.info("Interrupted. Closing down.");
return State.INTERRUPTED;
} catch (DiskErrorException de) {
- String msg = "Exiting task tracker for disk error:\n" +
+ String msg = "Exiting task tracker for disk error:\n" +
StringUtils.stringifyException(de);
- LOG.error(msg);
+ LOG.error(msg, de);
synchronized (this) {
jobClient.reportTaskTrackerError(taskTrackerName,
"DiskErrorException", msg);
@@ -1177,10 +1303,9 @@
LOG.info("Tasktracker disallowed by JobTracker.");
return State.DENIED;
}
- } catch (Exception except) {
- String msg = "Caught exception: " +
- StringUtils.stringifyException(except);
- LOG.error(msg);
+ } catch (IOException except) {
+ String msg = "Caught exception: " + except;
+ LOG.error(msg, except);
}
}
@@ -1491,6 +1616,7 @@
localMinSpaceKill = minSpaceKill;
}
if (!enoughFreeSpace(localMinSpaceKill)) {
+ LOG.info("Tasktracker running out of space -not accepting new tasks");
acceptNewTasks=false;
//we give up! do not accept new tasks until
//all the ones running have finished and they're all cleared up
@@ -1752,7 +1878,7 @@
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
- LOG.warn(msg);
+ LOG.warn(msg, e);
tip.reportDiagnosticInfo(msg);
try {
tip.kill(true);
@@ -1812,17 +1938,22 @@
if (!shuttingDown) {
LOG.info("Lost connection to JobTracker [" +
jobTrackAddr + "]. Retrying...", ex);
+ //enter the started state; we are no longer live
+ enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
+ LOG.info("Interrupted while waiting for the job tracker", ie);
}
}
}
}
} finally {
- close();
+ closeTaskTracker();
+ }
+ if (shuttingDown) {
+ return;
}
- if (shuttingDown) { return; }
LOG.warn("Reinitializing local state");
initialize();
}
@@ -1830,8 +1961,7 @@
shutdown();
}
} catch (IOException iex) {
- LOG.error("Got fatal exception while reinitializing TaskTracker: " +
- StringUtils.stringifyException(iex));
+ LOG.error("Got fatal exception while reinitializing TaskTracker: " + iex, iex);
return;
}
}
@@ -2502,6 +2632,20 @@
}
String taskDir = getLocalTaskDir(task.getJobID().toString(),
taskId.toString(), task.isTaskCleanupTask());
+ CleanupQueue cleaner = directoryCleanupThread;
+ boolean cleanupThread = cleaner == null;
+ if (!cleanupThread) {
+ LOG.info("Cannot clean up: no directory cleanup thread");
+ }
+ if (taskDir == null) {
+ throw new IOException("taskDir==null");
+ }
+ if(localJobConf==null) {
+ throw new IOException("localJobConf==null");
+ }
+ if (defaultJobConf == null) {
+ throw new IOException("defaultJobConf==null");
+ }
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2785,7 +2929,17 @@
String getName() {
return taskTrackerName;
}
-
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the name of this service
+ */
+ @Override
+ public String getServiceName() {
+ return taskTrackerName != null ? taskTrackerName : "Task Tracker";
+ }
+
private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
boolean sendCounters) {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -2902,7 +3056,9 @@
// enable the server to track time spent waiting on locks
ReflectionUtils.setContentionTracing
(conf.getBoolean("tasktracker.contention.tracking", false));
- new TaskTracker(conf).run();
+ TaskTracker tracker = new TaskTracker(conf, false);
+ Service.deploy(tracker);
+ tracker.run();
} catch (Throwable e) {
LOG.error("Can not start task tracker because "+
StringUtils.stringifyException(e));
@@ -3220,8 +3376,70 @@
try {
purgeTask(tip, wasFailure); // Marking it as failed/killed.
} catch (IOException ioe) {
- LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+ LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
}
}
}
+
+ /**
+ * Cleanup queue that can process actions to kill a job or task
+ */
+ private class TaskCleanupThread extends Daemon {
+
+ /**
+ * flag to halt work
+ */
+ private volatile boolean live = true;
+
+
+ /**
+ * Construct a daemon thread.
+ */
+ private TaskCleanupThread() {
+ setName("Task Tracker Task Cleanup Thread");
+ }
+
+ /**
+ * End the daemon. This is done by setting the live flag to false and
+ * interrupting ourselves.
+ */
+ public void terminate() {
+ live = false;
+ interrupt();
+ }
+
+ /**
+ * process task kill actions until told to stop being live.
+ */
+ public void run() {
+ LOG.debug("Task cleanup thread started");
+ while (live) {
+ try {
+ TaskTrackerAction action = tasksToCleanup.take();
+ if (action instanceof KillJobAction) {
+ purgeJob((KillJobAction) action);
+ } else if (action instanceof KillTaskAction) {
+ TaskInProgress tip;
+ KillTaskAction killAction = (KillTaskAction) action;
+ synchronized (TaskTracker.this) {
+ tip = tasks.get(killAction.getTaskID());
+ }
+ LOG.info("Received KillTaskAction for task: " +
+ killAction.getTaskID());
+ purgeTask(tip, false);
+ } else {
+ LOG.error("Non-delete action given to cleanup thread: "
+ + action);
+ }
+ } catch (InterruptedException except) {
+ //interrupted. this may have reset the live flag
+ } catch (Throwable except) {
+ LOG.warn("Exception in Cleanup thread: " + except,
+ except);
+ }
+ }
+ LOG.debug("Task cleanup thread ending");
+ }
+ }
+
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Mar 12 17:50:03 2009
@@ -107,6 +107,15 @@
return actionType;
}
+ /**
+ * {@inheritDoc}
+ * @return the action type.
+ */
+ @Override
+ public String toString() {
+ return "TaskTrackerAction: " + actionType;
+ }
+
public void write(DataOutput out) throws IOException {
WritableUtils.writeEnum(out, actionType);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Mar 12 17:50:03 2009
@@ -301,6 +301,18 @@
}
/**
+ * String value prints the basic status of the task tracker
+ * @return a string value for diagnostics
+ */
+ @Override
+ public String toString() {
+ return trackerName
+ + " at http://" + host + ":" + httpPort + "/"
+ + " current task count: " + taskReports.size()
+ + " failed task count: " + failures;
+ }
+
+ /**
* Return the {@link ResourceStatus} object configured with this
* status.
*
Modified: hadoop/core/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hadoop-site.xml?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hadoop-site.xml (original)
+++ hadoop/core/trunk/src/test/hadoop-site.xml Thu Mar 12 17:50:03 2009
@@ -11,4 +11,10 @@
+<property>
+ <name>dfs.datanode.ipc.address</name>
+ <value>localhost:50020</value>
+ <description>address for datanodes is always the localhost. This makes for
+ a very fast test setup</description>
+</property>
</configuration>
Modified: hadoop/core/trunk/src/test/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/log4j.properties?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/log4j.properties (original)
+++ hadoop/core/trunk/src/test/log4j.properties Thu Mar 12 17:50:03 2009
@@ -4,4 +4,16 @@
log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %x- %m%n
+
+#this is a logger that prints line numbers; it is unused but can be switched
+#on if desired
+log4j.appender.linenumbers=org.apache.log4j.ConsoleAppender
+log4j.appender.linenumbers.layout=org.apache.log4j.PatternLayout
+log4j.appender.linenumbers.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) %x - %m%n
+
+#log4j.logger.org.apache.hadoop=ERROR
+#log4j.logger.org.apache.hadoop.util.Service=DEBUG
+#log4j.logger.org.smartfrog.services.hadoop=DEBUG
+#log4j.logger.org.apache.hadoop.mapred=DEBUG
+#log4j.logger.org.apache.hadoop.ipc=DEBUG
Added: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java Thu Mar 12 17:50:03 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.conf;
+
+import junit.framework.TestCase;
+
+import java.util.Properties;
+
+/**
+ * Created 21-Jan-2009 13:42:36
+ */
+
+public class TestConfigurationSubclass extends TestCase {
+ private static final String EMPTY_CONFIGURATION_XML
+ = "/org/apache/hadoop/conf/empty-configuration.xml";
+
+
+ public void testGetProps() {
+ SubConf conf = new SubConf(true);
+ Properties properties = conf.getProperties();
+ assertNotNull("hadoop.tmp.dir is not set",
+ properties.getProperty("hadoop.tmp.dir"));
+ }
+
+ public void testReload() throws Throwable {
+ SubConf conf = new SubConf(true);
+ assertFalse(conf.isReloaded());
+ Configuration.addDefaultResource(EMPTY_CONFIGURATION_XML);
+ assertTrue(conf.isReloaded());
+ Properties properties = conf.getProperties();
+ }
+
+ public void testReloadNotQuiet() throws Throwable {
+ SubConf conf = new SubConf(true);
+ conf.setQuietMode(false);
+ assertFalse(conf.isReloaded());
+ conf.addResource("not-a-valid-resource");
+ assertTrue(conf.isReloaded());
+ try {
+ Properties properties = conf.getProperties();
+ fail("Should not have got here");
+ } catch (RuntimeException e) {
+ assertTrue(e.toString(),e.getMessage().contains("not found"));
+ }
+ }
+
+ private static class SubConf extends Configuration {
+
+ private boolean reloaded;
+
+ /**
+ * A new configuration where the behavior of reading from the default resources
+ * can be turned off.
+ *
+ * If the parameter {@code loadDefaults} is false, the new instance will not
+ * load resources from the default files.
+ *
+ * @param loadDefaults specifies whether to load from the default files
+ */
+ private SubConf(boolean loadDefaults) {
+ super(loadDefaults);
+ }
+
+ public Properties getProperties() {
+ return super.getProps();
+ }
+
+ /**
+ * {@inheritDoc}.
+ * Sets the reloaded flag.
+ */
+ @Override
+ public void reloadConfiguration() {
+ super.reloadConfiguration();
+ reloaded = true;
+ }
+
+ public boolean isReloaded() {
+ return reloaded;
+ }
+
+ public void setReloaded(boolean reloaded) {
+ this.reloaded = reloaded;
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java Thu Mar 12 17:50:03 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.conf;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.net.NetUtils;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Created 22-Jan-2009 14:15:59
+ */
+
+public class TestGetServerAddress extends TestCase {
+ private Configuration conf;
+ private static final String ADDRESS_TUPLE = "addressTuple";
+ private static final String BIND_ADDRESS_PORT = "bindAddressPort";
+ private static final String BIND_ADDRESS_NAME = "bindAddressName";
+ private static final String NOT_A_HOST_PORT_PAIR = "Not a host:port pair: ";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ conf = new Configuration();
+ }
+
+ private String lookup() {
+ String address = NetUtils.getServerAddress(conf,
+ BIND_ADDRESS_NAME,
+ BIND_ADDRESS_PORT,
+ ADDRESS_TUPLE);
+ assertNotNull("Null string for the server address", address);
+ return address;
+ }
+
+ private void setAddressTuple(String value) {
+ conf.set(ADDRESS_TUPLE, value);
+ }
+
+ private void setAddressPort(String value) {
+ conf.set(BIND_ADDRESS_PORT, value);
+ }
+
+ private void setAddressName(String value) {
+ conf.set(BIND_ADDRESS_NAME, value);
+ }
+
+ private void assertContains(String expected, Throwable throwable) {
+ assertContains(expected, throwable.getMessage());
+ }
+
+ private void assertContains(String expected, String value) {
+ assertNotNull("Expected " + expected + " got null string", value);
+ assertTrue("No \"" + expected + "\" in \"" + value + "\"",
+ value.contains(expected));
+ }
+
+ private void expectLookupFailure(String exceptionText) {
+ try {
+ String address = lookup();
+ fail("Expected an an exception, got " + address);
+ } catch (IllegalArgumentException expected) {
+ assertContains(exceptionText, expected);
+ }
+ }
+
+ public void testNoValues() {
+ expectLookupFailure(ADDRESS_TUPLE);
+ }
+
+ public void testPortHasPriority() throws Throwable {
+ setAddressTuple("name:8080");
+ String port = "1234";
+ setAddressPort(port);
+ assertContains(port, lookup());
+ }
+
+
+ public void testNameHasPriority() throws Throwable {
+ setAddressTuple("name:8080");
+ String name = "localhost";
+ setAddressName(name);
+ assertContains(name, lookup());
+ }
+
+ public void testNameAndPort() throws Throwable {
+ setAddressName("name");
+ setAddressPort("8080");
+ expectLookupFailure("No value for addressTuple");
+ }
+
+ public void testNameNoPort() throws Throwable {
+ setAddressName("name");
+ expectLookupFailure("No value for addressTuple");
+ }
+
+
+ public void testEmptyString() throws Throwable {
+ setAddressTuple("");
+ String address = lookup();
+ try {
+ InetSocketAddress addr = NetUtils.createSocketAddr(address, -1);
+ fail("Expected an an exception, got " + address + " and hence " + addr);
+ } catch (RuntimeException expected) {
+ assertContains(NOT_A_HOST_PORT_PAIR, expected);
+ }
+ }
+
+
+ public void testAddressIsNotATuple() {
+ setAddressTuple("localhost");
+ String address = lookup();
+ try {
+ InetSocketAddress addr = NetUtils.createSocketAddr(address, -1);
+ fail("Expected an an exception, got " + address + " and hence " + addr);
+ } catch (RuntimeException expected) {
+ assertContains(NOT_A_HOST_PORT_PAIR, expected);
+ assertContains("localhost", expected);
+ }
+ }
+
+ public void testAddressIsATriple() {
+ setAddressTuple("localhost:8080:1234");
+ String address = lookup();
+ try {
+ InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+ fail("Expected an an exception, got " + address + " and hence " + addr);
+ } catch (NumberFormatException expected) {
+ assertContains("8080:1234", expected);
+ }
+ }
+
+ public void testBindAddressIsATriple() {
+ setAddressPort("8080:1234");
+ setAddressTuple("localhost:8");
+ //non-tuples are not picked up when the socket is created
+ String address = lookup();
+ try {
+ InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+ fail("Expected an an exception, got " + address + " and hence " + addr);
+ } catch (NumberFormatException expected) {
+ assertContains("8080:1234", expected);
+ }
+ }
+
+ public void testAddressPortIsInvalid() {
+ setAddressTuple("localhost:twelve");
+ String address = lookup();
+ try {
+ InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+ fail("Expected an an exception, got " + address + " and hence " + addr);
+ } catch (NumberFormatException expected) {
+ assertContains("twelve", expected);
+ }
+ }
+
+ public void testAddressPortIsSigned() {
+ setAddressPort("-23");
+ setAddressTuple("localhost:8");
+ String address = lookup();
+ try {
+ InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+ fail("Expected an an exception, got " + address + " and hence " + addr);
+ } catch (IllegalArgumentException expected) {
+ assertContains("port out of range:-23", expected);
+ }
+ }
+
+
+}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Thu Mar 12 17:50:03 2009
@@ -79,7 +79,7 @@
FileOutputFormat.setOutputPath(conf, outDir);
- JobClient.runJob(conf);
+ runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
@@ -100,4 +100,14 @@
}
+ /**
+ * Run a job, getting the timeout from a system property
+ * @param conf job configuration to use
+ * @throws IOException for any problem, including job failure
+ */
+ private void runJob(JobConf conf) throws IOException {
+ long timeout = Long.getLong("test.jobclient.timeout", 0);
+ JobClient.runJob(conf, timeout);
+ }
+
}
\ No newline at end of file
Added: hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml Thu Mar 12 17:50:03 2009
@@ -0,0 +1,4 @@
+<?xml version="1.0"?>
+<configuration>
+</configuration>
+
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Mar 12 17:50:03 2009
@@ -64,6 +64,15 @@
private static String TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"))
.toString().replace(' ', '+');
+ private MiniDFSCluster cluster;
+
+ /**
+ * terminate any non-null cluster
+ */
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ MiniDFSCluster.close(cluster);
+ }
/** class MyFile contains enough information to recreate the contents of
* a single file.
@@ -269,8 +278,6 @@
/** copy files from dfs file system to dfs file system */
public void testCopyFromDfsToDfs() throws Exception {
String namenode = null;
- MiniDFSCluster cluster = null;
- try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster(conf, 2, true, null);
final FileSystem hdfs = cluster.getFileSystem();
@@ -291,15 +298,10 @@
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
/** copy files from local file system to dfs file system */
public void testCopyFromLocalToDfs() throws Exception {
- MiniDFSCluster cluster = null;
- try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster(conf, 1, true, null);
final FileSystem hdfs = cluster.getFileSystem();
@@ -319,15 +321,10 @@
deldir(hdfs, "/logs");
deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
/** copy files from dfs file system to local file system */
public void testCopyFromDfsToLocal() throws Exception {
- MiniDFSCluster cluster = null;
- try {
Configuration conf = new Configuration();
final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
cluster = new MiniDFSCluster(conf, 1, true, null);
@@ -348,16 +345,11 @@
deldir(hdfs, "/logs");
deldir(hdfs, "/srcdat");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(conf, 2, true, null);
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(conf, 2, true, null);
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = hdfs.getUri().toString();
if (namenode.startsWith("hdfs://")) {
@@ -408,9 +400,7 @@
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
+
}
public void testCopyDuplication() throws Exception {
@@ -486,11 +476,9 @@
public void testPreserveOption() throws Exception {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = null;
- try {
- cluster = new MiniDFSCluster(conf, 2, true, null);
- String nnUri = FileSystem.getDefaultUri(conf).toString();
- FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ String nnUri = FileSystem.getDefaultUri(conf).toString();
+ FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
{//test preserving user
MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
@@ -551,19 +539,15 @@
deldir(fs, "/destdat");
deldir(fs, "/srcdat");
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
}
- }
public void testMapCount() throws Exception {
String namenode = null;
- MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
try {
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(conf, 3, true, null);
- FileSystem fs = dfs.getFileSystem();
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs = cluster.getFileSystem();
final FsShell shell = new FsShell(conf);
namenode = fs.getUri().toString();
mr = new MiniMRCluster(3, namenode, 1);
@@ -604,8 +588,7 @@
assertTrue("Unexpected map count, logs.length=" + logs.length,
logs.length == 2);
} finally {
- if (dfs != null) { dfs.shutdown(); }
- if (mr != null) { mr.shutdown(); }
+ MiniMRCluster.close(mr);
}
}
@@ -714,9 +697,7 @@
}
public void testHftpAccessControl() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
+ final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
final UnixUserGroupInformation USER_UGI = createUGI("user", false);
//start cluster by DFS_UGI
@@ -750,9 +731,6 @@
fs.setPermission(srcrootpath, new FsPermission((short)0));
assertEquals(-3, ToolRunner.run(distcp, args));
}
- } finally {
- if (cluster != null) { cluster.shutdown(); }
- }
}
/** test -delete */
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 12 17:50:03 2009
@@ -65,25 +65,25 @@
private static final long MEGA = 1024 * 1024;
private static final int SEEKS_PER_FILE = 4;
- private static String ROOT = System.getProperty("test.build.data","fs_test");
- private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
- private static Path WRITE_DIR = new Path(ROOT, "fs_write");
- private static Path READ_DIR = new Path(ROOT, "fs_read");
- private static Path DATA_DIR = new Path(ROOT, "fs_data");
+ private static final String ROOT = System.getProperty("test.build.data","fs_test");
+ private static final Path CONTROL_DIR = new Path(ROOT, "fs_control");
+ private static final Path WRITE_DIR = new Path(ROOT, "fs_write");
+ private static final Path READ_DIR = new Path(ROOT, "fs_read");
+ private static final Path DATA_DIR = new Path(ROOT, "fs_data");
public void testFs() throws Exception {
- testFs(10 * MEGA, 100, 0);
+ createTestFs(10 * MEGA, 100, 0);
}
- public static void testFs(long megaBytes, int numFiles, long seed)
+ private static void createTestFs(long megaBytes, int numFiles, long seed)
throws Exception {
FileSystem fs = FileSystem.get(conf);
- if (seed == 0)
+ if (seed == 0) {
seed = new Random().nextLong();
-
- LOG.info("seed = "+seed);
+ LOG.info("seed = " + seed);
+ }
createControlFile(fs, megaBytes, numFiles, seed);
writeTest(fs, false);
@@ -553,7 +553,7 @@
}
}
} finally {
- if (cluster != null) cluster.shutdown();
+ MiniDFSCluster.close(cluster);
}
}
@@ -563,30 +563,44 @@
fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
}
- public void testFsClose() throws Exception {
- {
- Configuration conf = new Configuration();
- new Path("file:///").getFileSystem(conf);
- UnixUserGroupInformation.login(conf, true);
- FileSystem.closeAll();
- }
+ public void testCloseFileFS() throws Exception {
+ Configuration conf = new Configuration();
+ new Path("file:///").getFileSystem(conf);
+ UnixUserGroupInformation.login(conf, true);
+ FileSystem.closeAll();
+ }
- {
+ public void testCloseHftpFS() throws Exception {
Configuration conf = new Configuration();
new Path("hftp://localhost:12345/").getFileSystem(conf);
UnixUserGroupInformation.login(conf, true);
FileSystem.closeAll();
- }
+ }
- {
+ public void testCloseHftpFSAltLogin() throws Exception {
+ Configuration conf = new Configuration();
+ FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
+ UnixUserGroupInformation.login(fs.getConf(), true);
+ FileSystem.closeAll();
+ }
+
+
+ public void testCloseHDFS() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
+ URI uri = cluster.getFileSystem().getUri();
+ FileSystem fs = FileSystem.get(uri, new Configuration());
+ checkPath(cluster, fs);
Configuration conf = new Configuration();
- FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
- UnixUserGroupInformation.login(fs.getConf(), true);
+ new Path(uri.toString()).getFileSystem(conf);
+ UnixUserGroupInformation.login(conf, true);
FileSystem.closeAll();
+ } finally {
+ MiniDFSCluster.close(cluster);
}
}
-
public void testCacheKeysAreCaseInsensitive()
throws Exception
{