You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/03/12 20:43:06 UTC
svn commit: r752984 [2/2] - in /hadoop/core/trunk: conf/ ivy/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/http/
src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/net/ ...
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 12 19:43:05 2009
@@ -81,12 +81,11 @@
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.Service;
-import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -95,19 +94,13 @@
* in a networked environment. It contacts the JobTracker
* for Task assignments and reporting results.
*
- *
- * The TaskTracker has a complex lifecycle in that it
- * can be "recycled"; after {@link #closeTaskTracker()} is called,
- * it can be reset using {@link #initialize()}. This is
- * within the {@link Service} lifecycle.
*******************************************************/
-public class TaskTracker extends Service
+public class TaskTracker
implements MRConstants, TaskUmbilicalProtocol, Runnable {
- /** time to wait for a finished task to be reported as done: {@value}*/
- private static final long WAIT_FOR_DONE = 3 * 1000;
- int httpPort;
+ static final long WAIT_FOR_DONE = 3 * 1000;
+ private int httpPort;
- enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+ static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
static{
Configuration.addDefaultResource("mapred-default.xml");
@@ -126,10 +119,7 @@
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
- /**
- * Flag used to synchronize running state across threads.
- */
- private volatile boolean running = false;
+ volatile boolean running = true;
private LocalDirAllocator localDirAllocator;
String taskTrackerName;
@@ -144,7 +134,7 @@
// last heartbeat response recieved
short heartbeatResponseId = -1;
- /**
+ /*
* This is the last 'status' report sent by this tracker to the JobTracker.
*
* If the rpc call succeeds, this 'status' is cleared-out by this tracker;
@@ -154,15 +144,14 @@
*/
TaskTrackerStatus status = null;
- /** The system-directory on HDFS where job files are stored */
+ // The system-directory on HDFS where job files are stored
Path systemDirectory = null;
- /** The filesystem where job files are stored */
+ // The filesystem where job files are stored
FileSystem systemFS = null;
- private HttpServer server;
+ private final HttpServer server;
- /** Flag used to synchronize startup across threads. */
volatile boolean shuttingDown = false;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -355,7 +344,33 @@
/**
* A daemon-thread that pulls tips off the list of things to cleanup.
*/
- private TaskCleanupThread taskCleanupThread;
+ private Thread taskCleanupThread =
+ new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ try {
+ TaskTrackerAction action = tasksToCleanup.take();
+ if (action instanceof KillJobAction) {
+ purgeJob((KillJobAction) action);
+ } else if (action instanceof KillTaskAction) {
+ TaskInProgress tip;
+ KillTaskAction killAction = (KillTaskAction) action;
+ synchronized (TaskTracker.this) {
+ tip = tasks.get(killAction.getTaskID());
+ }
+ LOG.info("Received KillTaskAction for task: " +
+ killAction.getTaskID());
+ purgeTask(tip, false);
+ } else {
+ LOG.error("Non-delete action given to cleanup thread: "
+ + action);
+ }
+ } catch (Throwable except) {
+ LOG.warn(StringUtils.stringifyException(except));
+ }
+ }
+ }
+ }, "taskCleanup");
private RunningJob addTaskToJob(JobID jobId,
TaskInProgress tip) {
@@ -466,24 +481,11 @@
}
/**
- * Initialize the connection.
- * This method will block until a job tracker is found
- * It's in a separate method
+ * Do the real constructor work here. It's in a separate method
* so we can call it again and "recycle" the object after calling
* close().
*/
synchronized void initialize() throws IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Initializing Task Tracker: " + this);
- }
- //check that the server is not already live.
-
- //allow this operation in only two service states: started and live
- verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
-
- //flip the running switch for our inner threads
- running = true;
-
// use configured nameserver & interface to get local hostname
if (fConf.get("slave.host.name") != null) {
this.localHostname = fConf.get("slave.host.name");
@@ -570,17 +572,10 @@
DistributedCache.purgeCache(this.fConf);
cleanupStorage();
- //mark as just started; this is used in heartbeats
- this.justStarted = true;
- int connectTimeout = fConf
- .getInt("mapred.task.tracker.connect.timeout", 60000);
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
- jobTrackAddr, fConf, connectTimeout);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connected to JobTracker at " + jobTrackAddr);
- }
+ jobTrackAddr, this.fConf);
this.justInited = true;
this.running = true;
// start the thread that will fetch map task completion events
@@ -616,9 +611,7 @@
* startup, to remove any leftovers from previous run.
*/
public void cleanupStorage() throws IOException {
- if (fConf != null) {
- fConf.deleteLocalFiles();
- }
+ this.fConf.deleteLocalFiles();
}
// Object on wait which MapEventsFetcherThread is going to wait.
@@ -897,117 +890,25 @@
}
}
- /////////////////////////////////////////////////////
- // Service Lifecycle
- /////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- * @throws IOException for any problem.
- */
- @Override
- protected synchronized void innerStart() throws IOException {
- JobConf conf = fConf;
- maxCurrentMapTasks = conf.getInt(
- "mapred.tasktracker.map.tasks.maximum", 2);
- maxCurrentReduceTasks = conf.getInt(
- "mapred.tasktracker.reduce.tasks.maximum", 2);
- this.jobTrackAddr = JobTracker.getAddress(conf);
- String infoAddr =
- NetUtils.getServerAddress(conf,
- "tasktracker.http.bindAddress",
- "tasktracker.http.port",
- "mapred.task.tracker.http.address");
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
- String httpBindAddress = infoSocAddr.getHostName();
- int port = infoSocAddr.getPort();
- this.server = new HttpServer("task", httpBindAddress, port,
- port == 0, conf);
- workerThreads = conf.getInt("tasktracker.http.threads", 40);
- this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
- server.setThreads(1, workerThreads);
- // let the jsp pages get to the task tracker, config, and other relevant
- // objects
- FileSystem local = FileSystem.getLocal(conf);
- this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
- server.setAttribute("task.tracker", this);
- server.setAttribute("local.file.system", local);
- server.setAttribute("conf", conf);
- server.setAttribute("log", LOG);
- server.setAttribute("localDirAllocator", localDirAllocator);
- server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
- server.addInternalServlet("mapOutput", "/mapOutput",
- MapOutputServlet.class);
- server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
- server.start();
- this.httpPort = server.getPort();
- initialize();
- }
-
- /**
- * {@inheritDoc}
- *
- * @param status a status that can be updated with problems
- * @throws IOException for any problem
- */
- @Override
- public void innerPing(ServiceStatus status) throws IOException {
- if (server == null || !server.isAlive()) {
- status.addThrowable(
- new IOException("TaskTracker HttpServer is not running on port "
- + httpPort));
- }
- if (taskReportServer == null) {
- status.addThrowable(
- new IOException("TaskTracker Report Server is not running on "
- + taskReportAddress));
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws IOException exceptions which will be logged
- */
- @Override
- protected void innerClose() throws IOException {
- synchronized (this) {
- shuttingDown = true;
- closeTaskTracker();
- if (this.server != null) {
- try {
- LOG.info("Shutting down StatusHttpServer");
- this.server.stop();
+ public synchronized void shutdown() throws IOException {
+ shuttingDown = true;
+ close();
+ if (this.server != null) {
+ try {
+ LOG.info("Shutting down StatusHttpServer");
+ this.server.stop();
} catch (Exception e) {
LOG.warn("Exception shutting down TaskTracker", e);
- }
}
- stopCleanupThreads();
}
}
-
- /**
- * A shutdown request triggers termination
- * @throws IOException when errors happen during termination
- */
- public synchronized void shutdown() throws IOException {
- close();
- }
-
/**
* Close down the TaskTracker and all its components. We must also shutdown
* any running tasks or threads, and cleanup disk space. A new TaskTracker
* within the same process space might be restarted, so everything must be
* clean.
- * @throws IOException when errors happen during shutdown
*/
- public synchronized void closeTaskTracker() throws IOException {
- if (!running) {
- //this operation is a no-op when not already running
- return;
- }
- running = false;
+ public synchronized void close() throws IOException {
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1019,37 +920,27 @@
tip.jobHasFinished(false);
}
+ this.running = false;
+
// Clear local storage
cleanupStorage();
// Shutdown the fetcher thread
- if (mapEventsFetcher != null) {
- mapEventsFetcher.interrupt();
- }
+ this.mapEventsFetcher.interrupt();
//stop the launchers
- if (mapLauncher != null) {
- mapLauncher.cleanTaskQueue();
- mapLauncher.interrupt();
- }
- if (reduceLauncher != null) {
- reduceLauncher.cleanTaskQueue();
- reduceLauncher.interrupt();
- }
+ this.mapLauncher.interrupt();
+ this.reduceLauncher.interrupt();
+
+ jvmManager.stop();
- if (jvmManager != null) {
- jvmManager.stop();
- }
-
// shutdown RPC connections
RPC.stopProxy(jobClient);
// wait for the fetcher thread to exit
for (boolean done = false; !done; ) {
try {
- if(mapEventsFetcher != null) {
- mapEventsFetcher.join();
- }
+ this.mapEventsFetcher.join();
done = true;
} catch (InterruptedException e) {
}
@@ -1062,54 +953,52 @@
}
/**
- * Create and start a task tracker.
- * Subclasses must not subclass this constructor, as it may
- * call their initialisation/startup methods before the constructor
- * is complete.
- * It is here for backwards compatibility.
- * @param conf configuration
- * @throws IOException for problems on startup
+ * Start with the local machine name, and the default JobTracker
*/
public TaskTracker(JobConf conf) throws IOException {
- this(conf, true);
- }
-
- /**
- * Subclasses should extend this constructor and pass start=false to the
- * superclass to avoid race conditions in constructors and threads.
- * @param conf configuration
- * @param start flag to set to true to start the daemon
- * @throws IOException for problems on startup
- */
- protected TaskTracker(JobConf conf, boolean start) throws IOException {
- super(conf);
fConf = conf;
- //for backwards compatibility, the task tracker starts up unless told not
- //to. Subclasses should be very cautious about having their superclass
- //do that as subclassed methods can be invoked before the class is fully
- //configured
- if(start) {
- deploy(this);
- }
+ maxCurrentMapTasks = conf.getInt(
+ "mapred.tasktracker.map.tasks.maximum", 2);
+ maxCurrentReduceTasks = conf.getInt(
+ "mapred.tasktracker.reduce.tasks.maximum", 2);
+ this.jobTrackAddr = JobTracker.getAddress(conf);
+ String infoAddr =
+ NetUtils.getServerAddress(conf,
+ "tasktracker.http.bindAddress",
+ "tasktracker.http.port",
+ "mapred.task.tracker.http.address");
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+ String httpBindAddress = infoSocAddr.getHostName();
+ int httpPort = infoSocAddr.getPort();
+ this.server = new HttpServer("task", httpBindAddress, httpPort,
+ httpPort == 0, conf);
+ workerThreads = conf.getInt("tasktracker.http.threads", 40);
+ this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
+ server.setThreads(1, workerThreads);
+ // let the jsp pages get to the task tracker, config, and other relevant
+ // objects
+ FileSystem local = FileSystem.getLocal(conf);
+ this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+ server.setAttribute("task.tracker", this);
+ server.setAttribute("local.file.system", local);
+ server.setAttribute("conf", conf);
+ server.setAttribute("log", LOG);
+ server.setAttribute("localDirAllocator", localDirAllocator);
+ server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+ server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+ server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+ server.start();
+ this.httpPort = server.getPort();
+ initialize();
}
private void startCleanupThreads() throws IOException {
- taskCleanupThread = new TaskCleanupThread();
+ taskCleanupThread.setDaemon(true);
taskCleanupThread.start();
directoryCleanupThread = new CleanupQueue();
}
/**
- * Tell the cleanup threads that they should end themselves
- */
- private void stopCleanupThreads() {
- if (taskCleanupThread != null) {
- taskCleanupThread.terminate();
- taskCleanupThread = null;
- }
- }
-
- /**
* The connection to the JobTracker, used by the TaskRunner
* for locating remote files.
*/
@@ -1156,7 +1045,6 @@
*/
State offerService() throws Exception {
long lastHeartbeat = 0;
- boolean restartingService = true;
while (running && !shuttingDown) {
try {
@@ -1172,7 +1060,6 @@
// 1. Verify the buildVersion
// 2. Get the system directory & filesystem
if(justInited) {
- LOG.debug("Checking build version with JobTracker");
String jobTrackerBV = jobClient.getBuildVersion();
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
@@ -1182,7 +1069,7 @@
try {
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
} catch(Exception e ) {
- LOG.info("Problem reporting to jobtracker: " + e, e);
+ LOG.info("Problem reporting to jobtracker: " + e);
}
return State.DENIED;
}
@@ -1193,9 +1080,6 @@
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(fConf);
- if(LOG.isDebugEnabled()) {
- LOG.debug("System directory is " + systemDirectory);
- }
}
// Send the heartbeat and process the jobtracker's directives
@@ -1248,15 +1132,6 @@
return State.STALE;
}
- //At this point the job tracker is present and compatible,
- //so the service is coming up.
- //It is time to declare it as such
- if (restartingService) {
- //declare the service as live.
- enterLiveState();
- restartingService = false;
- }
-
// resetting heartbeat interval from the response.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
justStarted = false;
@@ -1282,16 +1157,15 @@
//we've cleaned up, resume normal operation
if (!acceptNewTasks && isIdle()) {
- LOG.info("ready to accept new tasks again");
acceptNewTasks=true;
}
} catch (InterruptedException ie) {
LOG.info("Interrupted. Closing down.");
return State.INTERRUPTED;
} catch (DiskErrorException de) {
- String msg = "Exiting task tracker for disk error:\n" +
+ String msg = "Exiting task tracker for disk error:\n" +
StringUtils.stringifyException(de);
- LOG.error(msg, de);
+ LOG.error(msg);
synchronized (this) {
jobClient.reportTaskTrackerError(taskTrackerName,
"DiskErrorException", msg);
@@ -1303,9 +1177,10 @@
LOG.info("Tasktracker disallowed by JobTracker.");
return State.DENIED;
}
- } catch (IOException except) {
- String msg = "Caught exception: " + except;
- LOG.error(msg, except);
+ } catch (Exception except) {
+ String msg = "Caught exception: " +
+ StringUtils.stringifyException(except);
+ LOG.error(msg);
}
}
@@ -1616,7 +1491,6 @@
localMinSpaceKill = minSpaceKill;
}
if (!enoughFreeSpace(localMinSpaceKill)) {
- LOG.info("Tasktracker running out of space -not accepting new tasks");
acceptNewTasks=false;
//we give up! do not accept new tasks until
//all the ones running have finished and they're all cleared up
@@ -1878,7 +1752,7 @@
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
- LOG.warn(msg, e);
+ LOG.warn(msg);
tip.reportDiagnosticInfo(msg);
try {
tip.kill(true);
@@ -1938,22 +1812,17 @@
if (!shuttingDown) {
LOG.info("Lost connection to JobTracker [" +
jobTrackAddr + "]. Retrying...", ex);
- //enter the started state; we are no longer live
- enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
- LOG.info("Interrupted while waiting for the job tracker", ie);
}
}
}
}
} finally {
- closeTaskTracker();
- }
- if (shuttingDown) {
- return;
+ close();
}
+ if (shuttingDown) { return; }
LOG.warn("Reinitializing local state");
initialize();
}
@@ -1961,7 +1830,8 @@
shutdown();
}
} catch (IOException iex) {
- LOG.error("Got fatal exception while reinitializing TaskTracker: " + iex, iex);
+ LOG.error("Got fatal exception while reinitializing TaskTracker: " +
+ StringUtils.stringifyException(iex));
return;
}
}
@@ -2632,20 +2502,6 @@
}
String taskDir = getLocalTaskDir(task.getJobID().toString(),
taskId.toString(), task.isTaskCleanupTask());
- CleanupQueue cleaner = directoryCleanupThread;
- boolean cleanupThread = cleaner == null;
- if (!cleanupThread) {
- LOG.info("Cannot clean up: no directory cleanup thread");
- }
- if (taskDir == null) {
- throw new IOException("taskDir==null");
- }
- if(localJobConf==null) {
- throw new IOException("localJobConf==null");
- }
- if (defaultJobConf == null) {
- throw new IOException("defaultJobConf==null");
- }
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2929,17 +2785,7 @@
String getName() {
return taskTrackerName;
}
-
- /**
- * {@inheritDoc}
- *
- * @return the name of this service
- */
- @Override
- public String getServiceName() {
- return taskTrackerName != null ? taskTrackerName : "Task Tracker";
- }
-
+
private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
boolean sendCounters) {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -3056,9 +2902,7 @@
// enable the server to track time spent waiting on locks
ReflectionUtils.setContentionTracing
(conf.getBoolean("tasktracker.contention.tracking", false));
- TaskTracker tracker = new TaskTracker(conf, false);
- Service.deploy(tracker);
- tracker.run();
+ new TaskTracker(conf).run();
} catch (Throwable e) {
LOG.error("Can not start task tracker because "+
StringUtils.stringifyException(e));
@@ -3376,70 +3220,8 @@
try {
purgeTask(tip, wasFailure); // Marking it as failed/killed.
} catch (IOException ioe) {
- LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
+ LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
}
}
}
-
- /**
- * Cleanup queue that can process actions to kill a job or task
- */
- private class TaskCleanupThread extends Daemon {
-
- /**
- * flag to halt work
- */
- private volatile boolean live = true;
-
-
- /**
- * Construct a daemon thread.
- */
- private TaskCleanupThread() {
- setName("Task Tracker Task Cleanup Thread");
- }
-
- /**
- * End the daemon. This is done by setting the live flag to false and
- * interrupting ourselves.
- */
- public void terminate() {
- live = false;
- interrupt();
- }
-
- /**
- * process task kill actions until told to stop being live.
- */
- public void run() {
- LOG.debug("Task cleanup thread started");
- while (live) {
- try {
- TaskTrackerAction action = tasksToCleanup.take();
- if (action instanceof KillJobAction) {
- purgeJob((KillJobAction) action);
- } else if (action instanceof KillTaskAction) {
- TaskInProgress tip;
- KillTaskAction killAction = (KillTaskAction) action;
- synchronized (TaskTracker.this) {
- tip = tasks.get(killAction.getTaskID());
- }
- LOG.info("Received KillTaskAction for task: " +
- killAction.getTaskID());
- purgeTask(tip, false);
- } else {
- LOG.error("Non-delete action given to cleanup thread: "
- + action);
- }
- } catch (InterruptedException except) {
- //interrupted. this may have reset the live flag
- } catch (Throwable except) {
- LOG.warn("Exception in Cleanup thread: " + except,
- except);
- }
- }
- LOG.debug("Task cleanup thread ending");
- }
- }
-
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Mar 12 19:43:05 2009
@@ -107,15 +107,6 @@
return actionType;
}
- /**
- * {@inheritDoc}
- * @return the action type.
- */
- @Override
- public String toString() {
- return "TaskTrackerAction: " + actionType;
- }
-
public void write(DataOutput out) throws IOException {
WritableUtils.writeEnum(out, actionType);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Mar 12 19:43:05 2009
@@ -301,18 +301,6 @@
}
/**
- * String value prints the basic status of the task tracker
- * @return a string value for diagnostics
- */
- @Override
- public String toString() {
- return trackerName
- + " at http://" + host + ":" + httpPort + "/"
- + " current task count: " + taskReports.size()
- + " failed task count: " + failures;
- }
-
- /**
* Return the {@link ResourceStatus} object configured with this
* status.
*
Modified: hadoop/core/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hadoop-site.xml?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hadoop-site.xml (original)
+++ hadoop/core/trunk/src/test/hadoop-site.xml Thu Mar 12 19:43:05 2009
@@ -11,10 +11,4 @@
-<property>
- <name>dfs.datanode.ipc.address</name>
- <value>localhost:50020</value>
- <description>address for datanodes is always the localhost. This makes for
- a very fast test setup</description>
-</property>
</configuration>
Modified: hadoop/core/trunk/src/test/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/log4j.properties?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/log4j.properties (original)
+++ hadoop/core/trunk/src/test/log4j.properties Thu Mar 12 19:43:05 2009
@@ -4,16 +4,4 @@
log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %x- %m%n
-
-#this is a logger that prints line numbers; it is unused but can be switched
-#on if desired
-log4j.appender.linenumbers=org.apache.log4j.ConsoleAppender
-log4j.appender.linenumbers.layout=org.apache.log4j.PatternLayout
-log4j.appender.linenumbers.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) %x - %m%n
-
-#log4j.logger.org.apache.hadoop=ERROR
-#log4j.logger.org.apache.hadoop.util.Service=DEBUG
-#log4j.logger.org.smartfrog.services.hadoop=DEBUG
-#log4j.logger.org.apache.hadoop.mapred=DEBUG
-#log4j.logger.org.apache.hadoop.ipc=DEBUG
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Thu Mar 12 19:43:05 2009
@@ -79,7 +79,7 @@
FileOutputFormat.setOutputPath(conf, outDir);
- runJob(conf);
+ JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
@@ -100,14 +100,4 @@
}
- /**
- * Run a job, getting the timeout from a system property
- * @param conf job configuration to use
- * @throws IOException for any problem, including job failure
- */
- private void runJob(JobConf conf) throws IOException {
- long timeout = Long.getLong("test.jobclient.timeout", 0);
- JobClient.runJob(conf, timeout);
- }
-
}
\ No newline at end of file
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Mar 12 19:43:05 2009
@@ -64,15 +64,6 @@
private static String TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"))
.toString().replace(' ', '+');
- private MiniDFSCluster cluster;
-
- /**
- * terminate any non-null cluster
- */
- protected void tearDown() throws Exception {
- super.tearDown();
- MiniDFSCluster.close(cluster);
- }
/** class MyFile contains enough information to recreate the contents of
* a single file.
@@ -278,6 +269,8 @@
/** copy files from dfs file system to dfs file system */
public void testCopyFromDfsToDfs() throws Exception {
String namenode = null;
+ MiniDFSCluster cluster = null;
+ try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster(conf, 2, true, null);
final FileSystem hdfs = cluster.getFileSystem();
@@ -298,10 +291,15 @@
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
}
/** copy files from local file system to dfs file system */
public void testCopyFromLocalToDfs() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster(conf, 1, true, null);
final FileSystem hdfs = cluster.getFileSystem();
@@ -321,10 +319,15 @@
deldir(hdfs, "/logs");
deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
}
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
}
/** copy files from dfs file system to local file system */
public void testCopyFromDfsToLocal() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
Configuration conf = new Configuration();
final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
cluster = new MiniDFSCluster(conf, 1, true, null);
@@ -345,11 +348,16 @@
deldir(hdfs, "/logs");
deldir(hdfs, "/srcdat");
}
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
}
public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
- Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(conf, 2, true, null);
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(conf, 2, true, null);
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = hdfs.getUri().toString();
if (namenode.startsWith("hdfs://")) {
@@ -400,7 +408,9 @@
deldir(hdfs, "/srcdat");
deldir(hdfs, "/logs");
}
-
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
}
public void testCopyDuplication() throws Exception {
@@ -476,9 +486,11 @@
public void testPreserveOption() throws Exception {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(conf, 2, true, null);
- String nnUri = FileSystem.getDefaultUri(conf).toString();
- FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ String nnUri = FileSystem.getDefaultUri(conf).toString();
+ FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
{//test preserving user
MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
@@ -539,15 +551,19 @@
deldir(fs, "/destdat");
deldir(fs, "/srcdat");
}
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
}
+ }
public void testMapCount() throws Exception {
String namenode = null;
+ MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(conf, 3, true, null);
- FileSystem fs = cluster.getFileSystem();
+ dfs = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs = dfs.getFileSystem();
final FsShell shell = new FsShell(conf);
namenode = fs.getUri().toString();
mr = new MiniMRCluster(3, namenode, 1);
@@ -588,7 +604,8 @@
assertTrue("Unexpected map count, logs.length=" + logs.length,
logs.length == 2);
} finally {
- MiniMRCluster.close(mr);
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown(); }
}
}
@@ -697,7 +714,9 @@
}
public void testHftpAccessControl() throws Exception {
- final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
+ MiniDFSCluster cluster = null;
+ try {
+ final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
final UnixUserGroupInformation USER_UGI = createUGI("user", false);
//start cluster by DFS_UGI
@@ -731,6 +750,9 @@
fs.setPermission(srcrootpath, new FsPermission((short)0));
assertEquals(-3, ToolRunner.run(distcp, args));
}
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
}
/** test -delete */
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 12 19:43:05 2009
@@ -65,25 +65,25 @@
private static final long MEGA = 1024 * 1024;
private static final int SEEKS_PER_FILE = 4;
- private static final String ROOT = System.getProperty("test.build.data","fs_test");
- private static final Path CONTROL_DIR = new Path(ROOT, "fs_control");
- private static final Path WRITE_DIR = new Path(ROOT, "fs_write");
- private static final Path READ_DIR = new Path(ROOT, "fs_read");
- private static final Path DATA_DIR = new Path(ROOT, "fs_data");
+ private static String ROOT = System.getProperty("test.build.data","fs_test");
+ private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
+ private static Path WRITE_DIR = new Path(ROOT, "fs_write");
+ private static Path READ_DIR = new Path(ROOT, "fs_read");
+ private static Path DATA_DIR = new Path(ROOT, "fs_data");
public void testFs() throws Exception {
- createTestFs(10 * MEGA, 100, 0);
+ testFs(10 * MEGA, 100, 0);
}
- private static void createTestFs(long megaBytes, int numFiles, long seed)
+ public static void testFs(long megaBytes, int numFiles, long seed)
throws Exception {
FileSystem fs = FileSystem.get(conf);
- if (seed == 0) {
+ if (seed == 0)
seed = new Random().nextLong();
- LOG.info("seed = " + seed);
- }
+
+ LOG.info("seed = "+seed);
createControlFile(fs, megaBytes, numFiles, seed);
writeTest(fs, false);
@@ -553,7 +553,7 @@
}
}
} finally {
- MiniDFSCluster.close(cluster);
+ if (cluster != null) cluster.shutdown();
}
}
@@ -563,44 +563,30 @@
fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
}
- public void testCloseFileFS() throws Exception {
- Configuration conf = new Configuration();
- new Path("file:///").getFileSystem(conf);
- UnixUserGroupInformation.login(conf, true);
- FileSystem.closeAll();
- }
+ public void testFsClose() throws Exception {
+ {
+ Configuration conf = new Configuration();
+ new Path("file:///").getFileSystem(conf);
+ UnixUserGroupInformation.login(conf, true);
+ FileSystem.closeAll();
+ }
- public void testCloseHftpFS() throws Exception {
+ {
Configuration conf = new Configuration();
new Path("hftp://localhost:12345/").getFileSystem(conf);
UnixUserGroupInformation.login(conf, true);
FileSystem.closeAll();
- }
-
- public void testCloseHftpFSAltLogin() throws Exception {
- Configuration conf = new Configuration();
- FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
- UnixUserGroupInformation.login(fs.getConf(), true);
- FileSystem.closeAll();
- }
-
+ }
- public void testCloseHDFS() throws Exception {
- MiniDFSCluster cluster = null;
- try {
- cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
- URI uri = cluster.getFileSystem().getUri();
- FileSystem fs = FileSystem.get(uri, new Configuration());
- checkPath(cluster, fs);
+ {
Configuration conf = new Configuration();
- new Path(uri.toString()).getFileSystem(conf);
- UnixUserGroupInformation.login(conf, true);
+ FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
+ UnixUserGroupInformation.login(fs.getConf(), true);
FileSystem.closeAll();
- } finally {
- MiniDFSCluster.close(cluster);
}
}
+
public void testCacheKeysAreCaseInsensitive()
throws Exception
{
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Mar 12 19:43:05 2009
@@ -25,14 +25,12 @@
import java.nio.channels.FileChannel;
import java.util.Random;
import java.io.RandomAccessFile;
-import java.io.Closeable;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.*;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -45,18 +43,13 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.*;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.util.Service;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* This class creates a single-process DFS cluster for junit testing.
* The data directories for non-simulated DFS are under the testing directory.
* For simulated data nodes, no underlying fs storage is used.
*/
-public class MiniDFSCluster implements Closeable {
- private static final int WAIT_SLEEP_TIME_MILLIS = 500;
- private static final int STARTUP_TIMEOUT_MILLIS = 30000;
+public class MiniDFSCluster {
public class DataNodeProperties {
DataNode datanode;
@@ -70,7 +63,6 @@
}
}
- private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
private Configuration conf;
private NameNode nameNode;
private int numDataNodes;
@@ -289,18 +281,17 @@
}
/**
- * wait for the cluster to get out of
+ * wait for the cluster to get out of
* safemode.
*/
public void waitClusterUp() {
if (numDataNodes > 0) {
- try {
- while (!isClusterUp()) {
- LOG.warn("Waiting for the Mini HDFS Cluster to start...");
- Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
+ while (!isClusterUp()) {
+ try {
+ System.err.println("Waiting for the Mini HDFS Cluster to start...");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
}
- } catch (InterruptedException e) {
- LOG.warn("Interrupted during startup", e);
}
}
}
@@ -332,6 +323,7 @@
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
long[] simulatedCapacities) throws IOException {
+
int curDatanodesNum = dataNodes.size();
// for mincluster's the default initialDelay for BRs is 0
if (conf.get("dfs.blockreport.initialDelay") == null) {
@@ -358,7 +350,7 @@
}
//Generate some hostnames if required
if (racks != null && hosts == null) {
- LOG.info("Generating host names for datanodes");
+ System.out.println("Generating host names for datanodes");
hosts = new String[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
@@ -401,16 +393,16 @@
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
simulatedCapacities[i-curDatanodesNum]);
}
- LOG.info("Starting DataNode " + i + " with dfs.data.dir: "
+ System.out.println("Starting DataNode " + i + " with dfs.data.dir: "
+ dnConf.get("dfs.data.dir"));
if (hosts != null) {
dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
- LOG.info("Starting DataNode " + i + " with hostname set to: "
+ System.out.println("Starting DataNode " + i + " with hostname set to: "
+ dnConf.get("slave.host.name"));
}
if (racks != null) {
String name = hosts[i - curDatanodesNum];
- LOG.info("Adding node with hostname : " + name + " to rack "+
+ System.out.println("Adding node with hostname : " + name + " to rack "+
racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(name,
racks[i-curDatanodesNum]);
@@ -425,7 +417,7 @@
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
if (racks != null) {
int port = dn.getSelfAddr().getPort();
- LOG.info("Adding node with IP:port : " + ipAddr + ":" + port+
+ System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
" to rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port,
racks[i-curDatanodesNum]);
@@ -556,8 +548,8 @@
/**
* Shut down the servers that are up.
*/
- public synchronized void shutdown() {
- LOG.info("Shutting down the Mini HDFS Cluster");
+ public void shutdown() {
+ System.out.println("Shutting down the Mini HDFS Cluster");
shutdownDataNodes();
if (nameNode != null) {
nameNode.stop();
@@ -565,57 +557,20 @@
nameNode = null;
}
}
-
- /**
- * Shuts down the cluster.
- *
- * @throws IOException if an I/O error occurs
- */
- public void close() throws IOException {
- shutdown();
- }
-
- /**
- * Static operation to shut down a cluster;
- * harmless if the cluster argument is null
- *
- * @param cluster cluster to shut down, or null for no cluster
- */
- public static void close(Closeable cluster) {
- Service.close(cluster);
- }
-
+
/**
* Shutdown all DataNodes started by this class. The NameNode
* is left running so that new DataNodes may be started.
*/
public void shutdownDataNodes() {
for (int i = dataNodes.size()-1; i >= 0; i--) {
- LOG.info("Shutting down DataNode " + i);
+ System.out.println("Shutting down DataNode " + i);
DataNode dn = dataNodes.remove(i).datanode;
dn.shutdown();
numDataNodes--;
}
}
- /**
- * Returns a string representation of the cluster.
- *
- * @return a string representation of the cluster
- */
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("Cluster up:").append(isClusterUp());
- builder.append("\nName Node:").append(getNameNode());
- builder.append("\nData node count:").append(dataNodes.size());
- for (DataNodeProperties dnp : dataNodes) {
- builder.append("\n Datanode: ").append(dnp.datanode);
- builder.append("\n state: ").append(dnp.datanode.getServiceState());
- }
- return builder.toString();
- }
-
/*
* Corrupt a block on all datanode
*/
@@ -640,15 +595,12 @@
if (blockFile.exists()) {
// Corrupt replica by writing random bytes into replica
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- try {
- FileChannel channel = raFile.getChannel();
- String badString = "BADBAD";
- int rand = random.nextInt((int) channel.size() / 2);
- raFile.seek(rand);
- raFile.write(badString.getBytes());
- } finally {
- raFile.close();
- }
+ FileChannel channel = raFile.getChannel();
+ String badString = "BADBAD";
+ int rand = random.nextInt((int)channel.size()/2);
+ raFile.seek(rand);
+ raFile.write(badString.getBytes());
+ raFile.close();
}
corrupted = true;
}
@@ -664,7 +616,7 @@
}
DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode;
- LOG.info("MiniDFSCluster Stopping DataNode " +
+ System.out.println("MiniDFSCluster Stopping DataNode " +
dn.dnRegistration.getName() +
" from a total of " + (dataNodes.size() + 1) +
" datanodes.");
@@ -703,10 +655,8 @@
}
}
- /**
+ /*
* Shutdown a datanode by name.
- * @param name datanode name
- * @return true if a node was shut down
*/
public synchronized DataNodeProperties stopDataNode(String name) {
int i;
@@ -720,7 +670,7 @@
}
/**
- * @return true if the NameNode is running and is out of Safe Mode.
+ * Returns true if the NameNode is running and is out of Safe Mode.
*/
public boolean isClusterUp() {
if (nameNode == null) {
@@ -735,7 +685,7 @@
}
/**
- * @return true if there is at least one DataNode running.
+ * Returns true if there is at least one DataNode running.
*/
public boolean isDataNodeUp() {
if (dataNodes == null || dataNodes.size() == 0) {
@@ -746,15 +696,13 @@
/**
* Get a client handle to the DFS cluster.
- * @return a new filesystem, which must be closed when no longer needed.
- * @throws IOException if the filesystem cannot be created
*/
public FileSystem getFileSystem() throws IOException {
return FileSystem.get(conf);
}
/**
- * @return the directories where the namenode stores its state.
+ * Get the directories where the namenode stores its image.
*/
public Collection<File> getNameDirs() {
return FSNamesystem.getNamespaceDirs(conf);
@@ -777,27 +725,17 @@
InetSocketAddress addr = new InetSocketAddress("localhost",
getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
- try {
- DatanodeInfo[] dnInfos;
- // make sure all datanodes are alive
- long timeout=System.currentTimeMillis() + STARTUP_TIMEOUT_MILLIS;
- while((dnInfos = client.datanodeReport(DatanodeReportType.LIVE)).length
- != numDataNodes) {
- try {
- Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
- if(System.currentTimeMillis() > timeout) {
- throw new IOException("Timeout waiting for the datanodes. "+
- "Expected " + numDataNodes + "but got " + dnInfos.length);
- }
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while waiting for the datanodes",e);
- }
+ // make sure all datanodes are alive
+ while(client.datanodeReport(DatanodeReportType.LIVE).length
+ != numDataNodes) {
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
}
- } finally {
- client.close();
}
+ client.close();
}
public void formatDataNodeDirs() throws IOException {
@@ -886,7 +824,7 @@
}
/**
- * @return the current set of datanodes
+ * Returns the current set of datanodes
*/
DataNode[] listDataNodes() {
DataNode[] list = new DataNode[dataNodes.size()];
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Thu Mar 12 19:43:05 2009
@@ -58,7 +58,7 @@
config = new Configuration();
config.set("dfs.name.dir", new File(hdfsDir, "name1").getPath());
FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
- config.set("dfs.http.address", "0.0.0.0:0");
+ config.set("dfs.http.address", NAME_NODE_HTTP_HOST + "0");
NameNode.format(config);
String[] args = new String[] {};
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java Thu Mar 12 19:43:05 2009
@@ -19,6 +19,7 @@
import junit.framework.TestCase;
import java.io.*;
+import java.util.Iterator;
import java.util.Random;
import java.net.*;
@@ -70,7 +71,7 @@
Configuration conf = fileSys.getConf();
ClientProtocol namenode = DFSClient.createNamenode(conf);
- waitForBlockReplication(name.toString(), namenode,
+ waitForBlockReplication(name.toString(), namenode,
Math.min(numDatanodes, repl), -1);
LocatedBlocks locations = namenode.getBlockLocations(name.toString(),0,
@@ -168,9 +169,9 @@
// Now get block details and check if the block is corrupt
blocks = dfsClient.namenode.
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- LOG.info("Waiting until block is marked as corrupt...");
while (blocks.get(0).isCorrupt() != true) {
try {
+ LOG.info("Waiting until block is marked as corrupt...");
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
@@ -248,14 +249,16 @@
boolean replOk = true;
LocatedBlocks blocks = namenode.getBlockLocations(filename, 0,
Long.MAX_VALUE);
-
- for (LocatedBlock block : blocks.getLocatedBlocks()) {
+
+ for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
+ iter.hasNext();) {
+ LocatedBlock block = iter.next();
int actual = block.getLocations().length;
- if (actual < expected) {
+ if ( actual < expected ) {
if (true || iters > 0) {
LOG.info("Not enough replicas for " + block.getBlock() +
- " yet. Expecting " + expected + ", got " +
- actual + ".");
+ " yet. Expecting " + expected + ", got " +
+ actual + ".");
}
replOk = false;
break;
@@ -382,8 +385,10 @@
waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
} finally {
- MiniDFSCluster.close(cluster);
- }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
}
/**
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Thu Mar 12 19:43:05 2009
@@ -137,11 +137,6 @@
editLog.close();
- //check that the namesystem is still healthy
- assertNotNull("FSNamesystem.getFSNamesystem() is null",
- FSNamesystem.getFSNamesystem());
- assertNotNull("FSNamesystem.getFSNamesystem().dir is null",
- FSNamesystem.getFSNamesystem().dir);
// Verify that we can read in all the transactions that we have written.
// If there were any corruptions, it is likely that the reading in
// of these transactions will throw an exception.
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Thu Mar 12 19:43:05 2009
@@ -93,7 +93,7 @@
config = props;
}
- private ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
+ public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
int numDir) throws Exception {
super(numTaskTrackers, namenode, numDir);
}
@@ -121,10 +121,14 @@
* @throws Exception if the cluster could not be stopped
*/
protected void stopCluster() throws Exception {
- MiniMRCluster.close(mrCluster);
- mrCluster = null;
- MiniDFSCluster.close(dfsCluster);
- dfsCluster = null;
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
}
/**
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java Thu Mar 12 19:43:05 2009
@@ -163,8 +163,22 @@
* @throws Exception
*/
protected void tearDown() throws Exception {
- MiniMRCluster.close(mrCluster);
- MiniDFSCluster.close(dfsCluster);
+ try {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ }
+ catch (Exception ex) {
+ System.out.println(ex);
+ }
+ try {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+ catch (Exception ex) {
+ System.out.println(ex);
+ }
super.tearDown();
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Mar 12 19:43:05 2009
@@ -19,7 +19,6 @@
import java.io.File;
import java.io.IOException;
-import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -32,13 +31,12 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.Service;
/**
* This class creates a single-process Map-Reduce cluster for junit testing.
* One thread is created for each server.
*/
-public class MiniMRCluster implements Closeable {
+public class MiniMRCluster {
private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
private Thread jobTrackerThread;
@@ -54,13 +52,9 @@
private String namenode;
private UnixUserGroupInformation ugi = null;
- private static final int TRACKER_STABILIZATION_TIMEOUT = 30000;
-
private JobConf conf;
private JobConf job;
- /** time for a tracker to shut down : {@value} */
- private static final long TRACKER_SHUTDOWN_TIMEOUT = 30000;
/**
* An inner class that runs a job tracker.
@@ -107,7 +101,7 @@
tracker = JobTracker.startTracker(jc);
tracker.offerService();
} catch (Throwable e) {
- LOG.error("Job tracker crashed: " + e, e);
+ LOG.error("Job tracker crashed", e);
isActive = false;
}
}
@@ -116,12 +110,13 @@
* Shutdown the job tracker and wait for it to finish.
*/
public void shutdown() {
- JobTracker jobTracker;
- synchronized (this) {
- jobTracker = tracker;
- tracker = null;
+ try {
+ if (tracker != null) {
+ tracker.stopTracker();
+ }
+ } catch (Throwable e) {
+ LOG.error("Problem shutting down job tracker", e);
}
- Service.close(jobTracker);
isActive = false;
}
}
@@ -182,7 +177,7 @@
} catch (Throwable e) {
isDead = true;
tt = null;
- LOG.error("task tracker " + trackerId + " crashed : "+e, e);
+ LOG.error("task tracker " + trackerId + " crashed", e);
}
}
@@ -425,7 +420,7 @@
//Generate rack names if required
if (racks == null) {
- LOG.info("Generating rack names for tasktrackers");
+ System.out.println("Generating rack names for tasktrackers");
racks = new String[numTaskTrackers];
for (int i=0; i < racks.length; ++i) {
racks[i] = NetworkTopology.DEFAULT_RACK;
@@ -434,7 +429,7 @@
//Generate some hostnames if required
if (hosts == null) {
- LOG.info("Generating host names for tasktrackers");
+ System.out.println("Generating host names for tasktrackers");
hosts = new String[numTaskTrackers];
for (int i = 0; i < numTaskTrackers; i++) {
hosts[i] = "host" + i + ".foo.com";
@@ -475,24 +470,6 @@
}
this.job = createJobConf(conf);
- // Wait till the MR cluster stabilizes
- long timeout = System.currentTimeMillis() +
- TRACKER_STABILIZATION_TIMEOUT;
- while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted during startup");
- }
- if(System.currentTimeMillis() > timeout) {
- String message = "Time out waiting for the task trackers to stabilize. "
- + "Expected tracker count: " + numTaskTrackers
- + " -actual count: "
- + jobTracker.tracker.getNumResolvedTaskTrackers();
- LOG.error(message);
- throw new IOException(message);
- }
- }
waitUntilIdle();
}
@@ -606,11 +583,12 @@
* Kill the jobtracker.
*/
public void stopJobTracker() {
+ //jobTracker.exit(-1);
jobTracker.shutdown();
jobTrackerThread.interrupt();
try {
- jobTrackerThread.join(TRACKER_SHUTDOWN_TIMEOUT);
+ jobTrackerThread.join();
} catch (InterruptedException ex) {
LOG.error("Problem waiting for job tracker to finish", ex);
}
@@ -671,25 +649,6 @@
}
}
- /**
- * Static operation to shut down a cluster; harmless if the cluster argument
- * is null
- *
- * @param cluster cluster to shut down, or null for no cluster
- */
- public static void close(Closeable cluster) {
- Service.close(cluster);
- }
-
- /**
- * Shuts down the cluster.
- *
- * @throws IOException if an I/O error occurs
- */
- public void close() throws IOException {
- shutdown();
- }
-
public static void main(String[] args) throws IOException {
LOG.info("Bringing up Jobtracker and tasktrackers.");
MiniMRCluster mr = new MiniMRCluster(4, "file:///", 1);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Thu Mar 12 19:43:05 2009
@@ -35,9 +35,6 @@
*/
public class TestMRServerPorts extends TestCase {
TestHDFSServerPorts hdfs = new TestHDFSServerPorts();
- private static final String STARTED_UNEXPECTEDLY
- = "the Job tracker should not have started";
- private static final String FAILED_TO_START = "The Job tracker did not start";
// Runs the JT in a separate thread
private static class JTRunner extends Thread {
@@ -88,6 +85,7 @@
return false;
throw e;
}
+ jt.fs.close();
jt.stopTracker();
return true;
}
@@ -124,21 +122,21 @@
conf2.set("mapred.job.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
boolean started = canStartJobTracker(conf2);
- assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
+ assertFalse(started); // should fail
// bind http server to the same port as name-node
conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
conf2.set("mapred.job.tracker.http.address",
hdfs.getConfig().get("dfs.http.address"));
started = canStartJobTracker(conf2);
- assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
+ assertFalse(started); // should fail again
// both ports are different from the name-node ones
conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
conf2.set("mapred.job.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
started = canStartJobTracker(conf2);
- assertTrue(FAILED_TO_START, started); // should start now
+ assertTrue(started); // should start now
} finally {
hdfs.stopNameNode(nn);
@@ -165,7 +163,7 @@
conf2.set("mapred.task.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
boolean started = canStartTaskTracker(conf2);
- assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
+ assertFalse(started); // should fail
// bind http server to the same port as name-node
conf2.set("mapred.task.tracker.report.address",
@@ -173,7 +171,7 @@
conf2.set("mapred.task.tracker.http.address",
hdfs.getConfig().get("dfs.http.address"));
started = canStartTaskTracker(conf2);
- assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
+ assertFalse(started); // should fail again
// both ports are different from the name-node ones
conf2.set("mapred.task.tracker.report.address",
@@ -181,7 +179,7 @@
conf2.set("mapred.task.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
started = canStartTaskTracker(conf2);
- assertTrue(FAILED_TO_START, started); // should start now
+ assertTrue(started); // should start now
} finally {
if (jt != null) {
jt.fs.close();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Thu Mar 12 19:43:05 2009
@@ -46,12 +46,7 @@
private MiniMRCluster mr;
private MiniDFSCluster dfs;
private FileSystem fileSys;
- private static final String BAILING_OUT = "Bailing out";
- private static final String TEST_SCRIPT_BAILING_OUT
- = "Test Script\n"+ BAILING_OUT;
- private static final int SCRIPT_SLEEP_TIMEOUT = 60000;
- private static final int SCRIPT_SLEEP_INTERVAL = 1000;
-
+
/**
* Fail map class
*/
@@ -60,7 +55,7 @@
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
- System.err.println(BAILING_OUT);
+ System.err.println("Bailing out");
throw new IOException();
}
}
@@ -170,17 +165,7 @@
// construct the task id of first map task of failmap
TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0);
// wait for the job to finish.
- long timeout = System.currentTimeMillis() + SCRIPT_SLEEP_TIMEOUT;
- while (!job.isComplete()) {
- try {
- Thread.sleep(SCRIPT_SLEEP_INTERVAL);
- } catch (InterruptedException e) {
- fail("Interrupted");
- }
- if(System.currentTimeMillis() > timeout) {
- fail("Timeout waiting for the job to complete ");
- }
- }
+ while (!job.isComplete()) ;
// return the output of debugout log.
return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId);
@@ -219,9 +204,7 @@
outDir,debugDir, debugScript, input);
// Assert the output of debug script.
- if(!result.contains(TEST_SCRIPT_BAILING_OUT)) {
- fail("Did not find " + TEST_SCRIPT_BAILING_OUT + "in \n" + result);
- }
+ assertEquals("Test Script\nBailing out", result);
} finally {
// close file system and shut down dfs and mapred cluster
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Thu Mar 12 19:43:05 2009
@@ -112,11 +112,13 @@
*/
TestRackAwareTaskPlacement.launchJobAndTestCounters(
testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
+ mr.shutdown();
} finally {
fileSys.delete(inDir, true);
fileSys.delete(outputPath, true);
- MiniMRCluster.close(mr);
- MiniDFSCluster.close(dfs);
+ if (dfs != null) {
+ dfs.shutdown();
+ }
}
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Mar 12 19:43:05 2009
@@ -151,11 +151,14 @@
launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
0, 3);
mr.shutdown();
- mr=null;
} finally {
- MiniDFSCluster.close(dfs);
- MiniMRCluster.close(mr);
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ if (mr != null) {
+ mr.shutdown();
+ }
}
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Thu Mar 12 19:43:05 2009
@@ -247,13 +247,8 @@
// they were running on a lost tracker
testSetupAndCleanupKill(mr, dfs, false);
} finally {
- try {
- if (dfs != null) { dfs.shutdown(); }
- if (mr != null) { mr.shutdown();
- }
- } catch (OutOfMemoryError e) {
- //ignore this as it means something went very wrong in the test logging
- //any attempt to log it may make things worse
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown();
}
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Thu Mar 12 19:43:05 2009
@@ -81,8 +81,8 @@
runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
mr.waitUntilIdle();
} finally {
- MiniMRCluster.close(mr);
- MiniDFSCluster.close(dfs);
+ mr.shutdown();
+ dfs.shutdown();
}
}