You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/03/12 20:43:06 UTC

svn commit: r752984 [2/2] - in /hadoop/core/trunk: conf/ ivy/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/http/ src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/net/ ...

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 12 19:43:05 2009
@@ -81,12 +81,11 @@
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.Service;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -95,19 +94,13 @@
  * in a networked environment.  It contacts the JobTracker
  * for Task assignments and reporting results.
  *
- *
- * The TaskTracker has a complex lifecycle in that it
- * can be "recycled"; after {@link #closeTaskTracker()} is called,
- * it can be reset using {@link #initialize()}. This is
- * within the {@link Service} lifecycle.
  *******************************************************/
-public class TaskTracker extends Service
+public class TaskTracker 
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
-  /** time to wait for a finished task to be reported as done: {@value}*/
-  private static final long WAIT_FOR_DONE = 3 * 1000;
-  int httpPort;
+  static final long WAIT_FOR_DONE = 3 * 1000;
+  private int httpPort;
 
-  enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+  static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
   static{
     Configuration.addDefaultResource("mapred-default.xml");
@@ -126,10 +119,7 @@
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  /**
-   * Flag used to synchronize running state across threads.
-   */
-  private volatile boolean running = false;
+  volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
   String taskTrackerName;
@@ -144,7 +134,7 @@
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
 
-  /**
+  /*
    * This is the last 'status' report sent by this tracker to the JobTracker.
    * 
    * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
@@ -154,15 +144,14 @@
    */
   TaskTrackerStatus status = null;
   
-  /** The system-directory on HDFS where job files are stored */
+  // The system-directory on HDFS where job files are stored 
   Path systemDirectory = null;
   
-  /** The filesystem where job files are stored */
+  // The filesystem where job files are stored
   FileSystem systemFS = null;
   
-  private HttpServer server;
+  private final HttpServer server;
     
-  /** Flag used to synchronize startup across threads. */
   volatile boolean shuttingDown = false;
     
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -355,7 +344,33 @@
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private TaskCleanupThread taskCleanupThread;
+  private Thread taskCleanupThread = 
+    new Thread(new Runnable() {
+        public void run() {
+          while (true) {
+            try {
+              TaskTrackerAction action = tasksToCleanup.take();
+              if (action instanceof KillJobAction) {
+                purgeJob((KillJobAction) action);
+              } else if (action instanceof KillTaskAction) {
+                TaskInProgress tip;
+                KillTaskAction killAction = (KillTaskAction) action;
+                synchronized (TaskTracker.this) {
+                  tip = tasks.get(killAction.getTaskID());
+                }
+                LOG.info("Received KillTaskAction for task: " + 
+                         killAction.getTaskID());
+                purgeTask(tip, false);
+              } else {
+                LOG.error("Non-delete action given to cleanup thread: "
+                          + action);
+              }
+            } catch (Throwable except) {
+              LOG.warn(StringUtils.stringifyException(except));
+            }
+          }
+        }
+      }, "taskCleanup");
     
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
@@ -466,24 +481,11 @@
   }
     
   /**
-   * Initialize the connection.
-   * This method will block until a job tracker is found
-   * It's in a separate method
+   * Do the real constructor work here.  It's in a separate method
    * so we can call it again and "recycle" the object after calling
    * close().
    */
   synchronized void initialize() throws IOException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Initializing Task Tracker: " + this);
-    }
-    //check that the server is not already live.
-
-    //allow this operation in only two service states: started and live
-    verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
-
-    //flip the running switch for our inner threads
-    running = true;
-    
     // use configured nameserver & interface to get local hostname
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
@@ -570,17 +572,10 @@
     DistributedCache.purgeCache(this.fConf);
     cleanupStorage();
 
-    //mark as just started; this is used in heartbeats
-    this.justStarted = true;
-    int connectTimeout = fConf
-            .getInt("mapred.task.tracker.connect.timeout", 60000);
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
-                       jobTrackAddr, fConf, connectTimeout);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connected to JobTracker at " + jobTrackAddr);
-    }
+                       jobTrackAddr, this.fConf);
     this.justInited = true;
     this.running = true;    
     // start the thread that will fetch map task completion events
@@ -616,9 +611,7 @@
    * startup, to remove any leftovers from previous run.
    */
   public void cleanupStorage() throws IOException {
-    if (fConf != null) {
-      fConf.deleteLocalFiles();
-    }
+    this.fConf.deleteLocalFiles();
   }
 
   // Object on wait which MapEventsFetcherThread is going to wait.
@@ -897,117 +890,25 @@
     }
   }
     
-  /////////////////////////////////////////////////////
-  // Service Lifecycle
-  /////////////////////////////////////////////////////
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws IOException for any problem.
-   */
-  @Override
-  protected synchronized void innerStart() throws IOException {
-    JobConf conf = fConf;
-    maxCurrentMapTasks = conf.getInt(
-            "mapred.tasktracker.map.tasks.maximum", 2);
-    maxCurrentReduceTasks = conf.getInt(
-            "mapred.tasktracker.reduce.tasks.maximum", 2);
-    this.jobTrackAddr = JobTracker.getAddress(conf);
-    String infoAddr =
-            NetUtils.getServerAddress(conf,
-                    "tasktracker.http.bindAddress",
-                    "tasktracker.http.port",
-                    "mapred.task.tracker.http.address");
-    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
-    String httpBindAddress = infoSocAddr.getHostName();
-    int port = infoSocAddr.getPort();
-    this.server = new HttpServer("task", httpBindAddress, port,
-            port == 0, conf);
-    workerThreads = conf.getInt("tasktracker.http.threads", 40);
-    this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
-    server.setThreads(1, workerThreads);
-    // let the jsp pages get to the task tracker, config, and other relevant
-    // objects
-    FileSystem local = FileSystem.getLocal(conf);
-    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
-    server.setAttribute("task.tracker", this);
-    server.setAttribute("local.file.system", local);
-    server.setAttribute("conf", conf);
-    server.setAttribute("log", LOG);
-    server.setAttribute("localDirAllocator", localDirAllocator);
-    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
-    server.addInternalServlet("mapOutput", "/mapOutput",
-            MapOutputServlet.class);
-    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
-    server.start();
-    this.httpPort = server.getPort();
-    initialize();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param status a status that can be updated with problems
-   * @throws IOException       for any problem
-   */
-  @Override
-  public void innerPing(ServiceStatus status) throws IOException {
-    if (server == null || !server.isAlive()) {
-      status.addThrowable(
-              new IOException("TaskTracker HttpServer is not running on port "
-                      + httpPort));
-    }
-    if (taskReportServer == null) {
-      status.addThrowable(
-              new IOException("TaskTracker Report Server is not running on "
-              + taskReportAddress));
-    }
-  }
-  
-  /**
-   * {@inheritDoc}
-   *
-   * @throws IOException exceptions which will be logged
-   */
-  @Override
-  protected void innerClose() throws IOException {
-    synchronized (this) {
-      shuttingDown = true;
-      closeTaskTracker();
-      if (this.server != null) {
-        try {
-          LOG.info("Shutting down StatusHttpServer");
-          this.server.stop();
+  public synchronized void shutdown() throws IOException {
+    shuttingDown = true;
+    close();
+    if (this.server != null) {
+      try {
+        LOG.info("Shutting down StatusHttpServer");
+        this.server.stop();
       } catch (Exception e) {
         LOG.warn("Exception shutting down TaskTracker", e);
-        }
       }
-      stopCleanupThreads();
     }
   }
-
-  /**
-   * A shutdown request triggers termination
-   * @throws IOException when errors happen during termination
-   */
-  public synchronized void shutdown() throws IOException {
-    close();
-  }
-
   /**
    * Close down the TaskTracker and all its components.  We must also shutdown
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
-   * @throws IOException when errors happen during shutdown
    */
-  public synchronized void closeTaskTracker() throws IOException {
-    if (!running) {
-      //this operation is a no-op when not already running
-      return;
-    }
-    running = false;
+  public synchronized void close() throws IOException {
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1019,37 +920,27 @@
       tip.jobHasFinished(false);
     }
     
+    this.running = false;
+        
     // Clear local storage
     cleanupStorage();
         
     // Shutdown the fetcher thread
-    if (mapEventsFetcher != null) {
-      mapEventsFetcher.interrupt();
-    }
+    this.mapEventsFetcher.interrupt();
     
     //stop the launchers
-    if (mapLauncher != null) {
-      mapLauncher.cleanTaskQueue();
-      mapLauncher.interrupt();
-    }
-    if (reduceLauncher != null) {
-      reduceLauncher.cleanTaskQueue();
-      reduceLauncher.interrupt();
-    }
+    this.mapLauncher.interrupt();
+    this.reduceLauncher.interrupt();
+    
+    jvmManager.stop();
     
-    if (jvmManager != null) {
-      jvmManager.stop();
-    }
-      
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
 
     // wait for the fetcher thread to exit
     for (boolean done = false; !done; ) {
       try {
-        if(mapEventsFetcher != null) {
-          mapEventsFetcher.join();
-        }
+        this.mapEventsFetcher.join();
         done = true;
       } catch (InterruptedException e) {
       }
@@ -1062,54 +953,52 @@
   }
 
   /**
-   * Create and start a task tracker.
-   * Subclasses must not subclass this constructor, as it may
-   * call their initialisation/startup methods before the constructor
-   * is complete.
-   * It is here for backwards compatibility.
-   * @param conf configuration
-   * @throws IOException for problems on startup
+   * Start with the local machine name, and the default JobTracker
    */
   public TaskTracker(JobConf conf) throws IOException {
-    this(conf, true);
-  }
-
-  /**
-   * Subclasses should extend this constructor and pass start=false to the
-   * superclass to avoid race conditions in constructors and threads.
-   * @param conf configuration
-   * @param start flag to set to true to start the daemon
-   * @throws IOException for problems on startup
-   */
-  protected TaskTracker(JobConf conf, boolean start) throws IOException {
-    super(conf);
     fConf = conf;
-    //for backwards compatibility, the task tracker starts up unless told not
-    //to. Subclasses should be very cautious about having their superclass
-    //do that as subclassed methods can be invoked before the class is fully
-    //configured
-    if(start) {
-      deploy(this);
-    }
+    maxCurrentMapTasks = conf.getInt(
+                  "mapred.tasktracker.map.tasks.maximum", 2);
+    maxCurrentReduceTasks = conf.getInt(
+                  "mapred.tasktracker.reduce.tasks.maximum", 2);
+    this.jobTrackAddr = JobTracker.getAddress(conf);
+    String infoAddr = 
+      NetUtils.getServerAddress(conf,
+                                "tasktracker.http.bindAddress", 
+                                "tasktracker.http.port",
+                                "mapred.task.tracker.http.address");
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+    String httpBindAddress = infoSocAddr.getHostName();
+    int httpPort = infoSocAddr.getPort();
+    this.server = new HttpServer("task", httpBindAddress, httpPort,
+        httpPort == 0, conf);
+    workerThreads = conf.getInt("tasktracker.http.threads", 40);
+    this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
+    server.setThreads(1, workerThreads);
+    // let the jsp pages get to the task tracker, config, and other relevant
+    // objects
+    FileSystem local = FileSystem.getLocal(conf);
+    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+    server.setAttribute("task.tracker", this);
+    server.setAttribute("local.file.system", local);
+    server.setAttribute("conf", conf);
+    server.setAttribute("log", LOG);
+    server.setAttribute("localDirAllocator", localDirAllocator);
+    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+    server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+    server.start();
+    this.httpPort = server.getPort();
+    initialize();
   }
 
   private void startCleanupThreads() throws IOException {
-    taskCleanupThread = new TaskCleanupThread();
+    taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
   }
   
   /**
-   * Tell the cleanup threads that they should end themselves 
-   */
-  private void stopCleanupThreads() {
-    if (taskCleanupThread != null) {
-      taskCleanupThread.terminate();
-      taskCleanupThread = null;
-    }
-  }
-  
-  /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
    */
@@ -1156,7 +1045,6 @@
    */
   State offerService() throws Exception {
     long lastHeartbeat = 0;
-    boolean restartingService = true;
 
     while (running && !shuttingDown) {
       try {
@@ -1172,7 +1060,6 @@
         // 1. Verify the buildVersion
         // 2. Get the system directory & filesystem
         if(justInited) {
-          LOG.debug("Checking build version with JobTracker");
           String jobTrackerBV = jobClient.getBuildVersion();
           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
             String msg = "Shutting down. Incompatible buildVersion." +
@@ -1182,7 +1069,7 @@
             try {
               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
             } catch(Exception e ) {
-              LOG.info("Problem reporting to jobtracker: " + e, e);
+              LOG.info("Problem reporting to jobtracker: " + e);
             }
             return State.DENIED;
           }
@@ -1193,9 +1080,6 @@
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("System directory is " + systemDirectory);
-          }
         }
         
         // Send the heartbeat and process the jobtracker's directives
@@ -1248,15 +1132,6 @@
           return State.STALE;
         }
             
-        //At this point the job tracker is present and compatible,
-        //so the service is coming up.
-        //It is time to declare it as such
-        if (restartingService) {
-          //declare the service as live.
-          enterLiveState();
-          restartingService = false;
-        }
-            
         // resetting heartbeat interval from the response.
         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
         justStarted = false;
@@ -1282,16 +1157,15 @@
             
         //we've cleaned up, resume normal operation
         if (!acceptNewTasks && isIdle()) {
-          LOG.info("ready to accept new tasks again");
           acceptNewTasks=true;
         }
       } catch (InterruptedException ie) {
         LOG.info("Interrupted. Closing down.");
         return State.INTERRUPTED;
       } catch (DiskErrorException de) {
-          String msg = "Exiting task tracker for disk error:\n" +
+        String msg = "Exiting task tracker for disk error:\n" +
           StringUtils.stringifyException(de);
-          LOG.error(msg, de);
+        LOG.error(msg);
         synchronized (this) {
           jobClient.reportTaskTrackerError(taskTrackerName, 
                                            "DiskErrorException", msg);
@@ -1303,9 +1177,10 @@
           LOG.info("Tasktracker disallowed by JobTracker.");
           return State.DENIED;
         }
-      } catch (IOException except) {
-        String msg = "Caught exception: " + except;
-        LOG.error(msg, except);
+      } catch (Exception except) {
+        String msg = "Caught exception: " + 
+          StringUtils.stringifyException(except);
+        LOG.error(msg);
       }
     }
 
@@ -1616,7 +1491,6 @@
       localMinSpaceKill = minSpaceKill;  
     }
     if (!enoughFreeSpace(localMinSpaceKill)) {
-      LOG.info("Tasktracker running out of space -not accepting new tasks");
       acceptNewTasks=false; 
       //we give up! do not accept new tasks until
       //all the ones running have finished and they're all cleared up
@@ -1878,7 +1752,7 @@
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
-      LOG.warn(msg, e);
+      LOG.warn(msg);
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);
@@ -1938,22 +1812,17 @@
               if (!shuttingDown) {
                 LOG.info("Lost connection to JobTracker [" +
                          jobTrackAddr + "].  Retrying...", ex);
-                //enter the started state; we are no longer live
-                enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
                 try {
                   Thread.sleep(5000);
                 } catch (InterruptedException ie) {
-                  LOG.info("Interrupted while waiting for the job tracker", ie);
                 }
               }
             }
           }
         } finally {
-          closeTaskTracker();
-        }
-        if (shuttingDown) {
-          return;
+          close();
         }
+        if (shuttingDown) { return; }
         LOG.warn("Reinitializing local state");
         initialize();
       }
@@ -1961,7 +1830,8 @@
         shutdown();
       }
     } catch (IOException iex) {
-      LOG.error("Got fatal exception while reinitializing TaskTracker: " + iex, iex);
+      LOG.error("Got fatal exception while reinitializing TaskTracker: " +
+                StringUtils.stringifyException(iex));
       return;
     }
   }
@@ -2632,20 +2502,6 @@
           }
           String taskDir = getLocalTaskDir(task.getJobID().toString(),
                              taskId.toString(), task.isTaskCleanupTask());
-          CleanupQueue cleaner = directoryCleanupThread;
-          boolean cleanupThread = cleaner == null;
-          if (!cleanupThread) {
-            LOG.info("Cannot clean up: no directory cleanup thread");
-          }
-          if (taskDir == null) {
-            throw new IOException("taskDir==null");
-          }
-          if(localJobConf==null) {
-              throw new IOException("localJobConf==null");
-          }
-          if (defaultJobConf == null) {
-            throw new IOException("defaultJobConf==null");
-          }
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2929,17 +2785,7 @@
   String getName() {
     return taskTrackerName;
   }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return the name of this service
-   */
-  @Override
-  public String getServiceName() {
-    return taskTrackerName != null ? taskTrackerName : "Task Tracker";
-  }
-
+    
   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
                                           boolean sendCounters) {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -3056,9 +2902,7 @@
       // enable the server to track time spent waiting on locks
       ReflectionUtils.setContentionTracing
         (conf.getBoolean("tasktracker.contention.tracking", false));
-      TaskTracker tracker = new TaskTracker(conf, false);
-      Service.deploy(tracker);
-      tracker.run();
+      new TaskTracker(conf).run();
     } catch (Throwable e) {
       LOG.error("Can not start task tracker because "+
                 StringUtils.stringifyException(e));
@@ -3376,70 +3220,8 @@
       try {
         purgeTask(tip, wasFailure); // Marking it as failed/killed.
       } catch (IOException ioe) {
-        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
+        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
       }
     }
   }
-
-  /**
-   * Cleanup queue that can process actions to kill a job or task
-   */
-  private class TaskCleanupThread extends Daemon {
-
-    /**
-     * flag to halt work
-     */
-    private volatile boolean live = true;
-
-
-    /**
-     * Construct a daemon thread.
-     */
-    private TaskCleanupThread() {
-      setName("Task Tracker Task Cleanup Thread");
-    }
-
-    /**
-     * End the daemon. This is done by setting the live flag to false and
-     * interrupting ourselves.
-     */
-    public void terminate() {
-      live = false;
-      interrupt();
-    }
-
-    /**
-     * process task kill actions until told to stop being live.
-     */
-    public void run() {
-      LOG.debug("Task cleanup thread started");
-      while (live) {
-        try {
-          TaskTrackerAction action = tasksToCleanup.take();
-          if (action instanceof KillJobAction) {
-            purgeJob((KillJobAction) action);
-          } else if (action instanceof KillTaskAction) {
-            TaskInProgress tip;
-            KillTaskAction killAction = (KillTaskAction) action;
-            synchronized (TaskTracker.this) {
-              tip = tasks.get(killAction.getTaskID());
-            }
-            LOG.info("Received KillTaskAction for task: " +
-                    killAction.getTaskID());
-            purgeTask(tip, false);
-          } else {
-            LOG.error("Non-delete action given to cleanup thread: "
-                    + action);
-          }
-        } catch (InterruptedException except) {
-          //interrupted. this may have reset the live flag
-        } catch (Throwable except) {
-          LOG.warn("Exception in Cleanup thread: " + except,
-                  except);
-        }
-      }
-      LOG.debug("Task cleanup thread ending");
-    }
-  }
-
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Mar 12 19:43:05 2009
@@ -107,15 +107,6 @@
     return actionType;
   }
 
-  /**
-   * {@inheritDoc}
-   * @return the action type.
-   */
-  @Override
-  public String toString() {
-    return "TaskTrackerAction: " + actionType;
-  }
-
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeEnum(out, actionType);
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Mar 12 19:43:05 2009
@@ -301,18 +301,6 @@
   }  
   
   /**
-   * String value prints the basic status of the task tracker
-   * @return a string value for diagnostics
-   */
-  @Override
-  public String toString() {
-    return trackerName
-            + " at http://" + host + ":" + httpPort + "/"
-            + " current task count: " + taskReports.size()
-            + " failed task count: " + failures;
-  }
-  
-  /**
    * Return the {@link ResourceStatus} object configured with this
    * status.
    * 

Modified: hadoop/core/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hadoop-site.xml?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hadoop-site.xml (original)
+++ hadoop/core/trunk/src/test/hadoop-site.xml Thu Mar 12 19:43:05 2009
@@ -11,10 +11,4 @@
 
 
 
-<property>
-  <name>dfs.datanode.ipc.address</name>
-  <value>localhost:50020</value>
-  <description>address for datanodes is always the localhost. This makes for
-  a very fast test setup</description>
-</property>
 </configuration>

Modified: hadoop/core/trunk/src/test/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/log4j.properties?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/log4j.properties (original)
+++ hadoop/core/trunk/src/test/log4j.properties Thu Mar 12 19:43:05 2009
@@ -4,16 +4,4 @@
 log4j.threshhold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %x- %m%n
-
-#this is a logger that prints line numbers; it is unused but can be switched
-#on if desired
-log4j.appender.linenumbers=org.apache.log4j.ConsoleAppender
-log4j.appender.linenumbers.layout=org.apache.log4j.PatternLayout
-log4j.appender.linenumbers.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) %x - %m%n
-
-#log4j.logger.org.apache.hadoop=ERROR
-#log4j.logger.org.apache.hadoop.util.Service=DEBUG
-#log4j.logger.org.smartfrog.services.hadoop=DEBUG
-#log4j.logger.org.apache.hadoop.mapred=DEBUG
-#log4j.logger.org.apache.hadoop.ipc=DEBUG
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Thu Mar 12 19:43:05 2009
@@ -79,7 +79,7 @@
 
     FileOutputFormat.setOutputPath(conf, outDir);
 
-    runJob(conf);
+    JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(outDir,
@@ -100,14 +100,4 @@
 
   }
 
-  /**
-   * Run a job, getting the timeout from a system property
-   * @param conf job configuration to use
-   * @throws IOException for any problem, including job failure
-   */
-  private void runJob(JobConf conf) throws IOException {
-    long timeout = Long.getLong("test.jobclient.timeout", 0);
-    JobClient.runJob(conf, timeout);
-  }
-
 }
\ No newline at end of file

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Mar 12 19:43:05 2009
@@ -64,15 +64,6 @@
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
     .toString().replace(' ', '+');
-  private MiniDFSCluster cluster;
-
-  /**
-   * terminate any non-null cluster
-   */
-  protected void tearDown() throws Exception {
-    super.tearDown();
-    MiniDFSCluster.close(cluster);
-  }
 
   /** class MyFile contains enough information to recreate the contents of
    * a single file.
@@ -278,6 +269,8 @@
   /** copy files from dfs file system to dfs file system */
   public void testCopyFromDfsToDfs() throws Exception {
     String namenode = null;
+    MiniDFSCluster cluster = null;
+    try {
       Configuration conf = new Configuration();
       cluster = new MiniDFSCluster(conf, 2, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
@@ -298,10 +291,15 @@
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
   }
   
   /** copy files from local file system to dfs file system */
   public void testCopyFromLocalToDfs() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
       Configuration conf = new Configuration();
       cluster = new MiniDFSCluster(conf, 1, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
@@ -321,10 +319,15 @@
         deldir(hdfs, "/logs");
         deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
       }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
   }
 
   /** copy files from dfs file system to local file system */
   public void testCopyFromDfsToLocal() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
       Configuration conf = new Configuration();
       final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
       cluster = new MiniDFSCluster(conf, 1, true, null);
@@ -345,11 +348,16 @@
         deldir(hdfs, "/logs");
         deldir(hdfs, "/srcdat");
       }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
   }
 
   public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
-    Configuration conf = new Configuration();
-    cluster = new MiniDFSCluster(conf, 2, true, null);
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(conf, 2, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
       final String namenode = hdfs.getUri().toString();
       if (namenode.startsWith("hdfs://")) {
@@ -400,7 +408,9 @@
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
-
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
   }
 
   public void testCopyDuplication() throws Exception {
@@ -476,9 +486,11 @@
 
   public void testPreserveOption() throws Exception {
     Configuration conf = new Configuration();
-    cluster = new MiniDFSCluster(conf, 2, true, null);
-    String nnUri = FileSystem.getDefaultUri(conf).toString();
-    FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      String nnUri = FileSystem.getDefaultUri(conf).toString();
+      FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
 
       {//test preserving user
         MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
@@ -539,15 +551,19 @@
         deldir(fs, "/destdat");
         deldir(fs, "/srcdat");
       }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
     }
+  }
 
   public void testMapCount() throws Exception {
     String namenode = null;
+    MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(conf, 3, true, null);
-      FileSystem fs = cluster.getFileSystem();
+      dfs = new MiniDFSCluster(conf, 3, true, null);
+      FileSystem fs = dfs.getFileSystem();
       final FsShell shell = new FsShell(conf);
       namenode = fs.getUri().toString();
       mr = new MiniMRCluster(3, namenode, 1);
@@ -588,7 +604,8 @@
       assertTrue("Unexpected map count, logs.length=" + logs.length,
           logs.length == 2);
     } finally {
-      MiniMRCluster.close(mr);
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
     }
   }
 
@@ -697,7 +714,9 @@
   }
 
   public void testHftpAccessControl() throws Exception {
-      final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
+    MiniDFSCluster cluster = null;
+    try {
+      final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); 
       final UnixUserGroupInformation USER_UGI = createUGI("user", false); 
 
       //start cluster by DFS_UGI
@@ -731,6 +750,9 @@
         fs.setPermission(srcrootpath, new FsPermission((short)0));
         assertEquals(-3, ToolRunner.run(distcp, args));
       }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
   }
 
   /** test -delete */

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 12 19:43:05 2009
@@ -65,25 +65,25 @@
   private static final long MEGA = 1024 * 1024;
   private static final int SEEKS_PER_FILE = 4;
 
-  private static final String ROOT = System.getProperty("test.build.data","fs_test");
-  private static final Path CONTROL_DIR = new Path(ROOT, "fs_control");
-  private static final Path WRITE_DIR = new Path(ROOT, "fs_write");
-  private static final Path READ_DIR = new Path(ROOT, "fs_read");
-  private static final Path DATA_DIR = new Path(ROOT, "fs_data");
+  private static String ROOT = System.getProperty("test.build.data","fs_test");
+  private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
+  private static Path WRITE_DIR = new Path(ROOT, "fs_write");
+  private static Path READ_DIR = new Path(ROOT, "fs_read");
+  private static Path DATA_DIR = new Path(ROOT, "fs_data");
 
   public void testFs() throws Exception {
-    createTestFs(10 * MEGA, 100, 0);
+    testFs(10 * MEGA, 100, 0);
   }
 
-  private static void createTestFs(long megaBytes, int numFiles, long seed)
+  public static void testFs(long megaBytes, int numFiles, long seed)
     throws Exception {
 
     FileSystem fs = FileSystem.get(conf);
 
-    if (seed == 0) {
+    if (seed == 0)
       seed = new Random().nextLong();
-      LOG.info("seed = " + seed);
-    }
+
+    LOG.info("seed = "+seed);
 
     createControlFile(fs, megaBytes, numFiles, seed);
     writeTest(fs, false);
@@ -553,7 +553,7 @@
         }
       }
     } finally {
-      MiniDFSCluster.close(cluster);
+      if (cluster != null) cluster.shutdown(); 
     }
   }
   
@@ -563,44 +563,30 @@
     fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
   }
 
-  public void testCloseFileFS() throws Exception {
-    Configuration conf = new Configuration();
-    new Path("file:///").getFileSystem(conf);
-    UnixUserGroupInformation.login(conf, true);
-    FileSystem.closeAll();
-  }
+  public void testFsClose() throws Exception {
+    {
+      Configuration conf = new Configuration();
+      new Path("file:///").getFileSystem(conf);
+      UnixUserGroupInformation.login(conf, true);
+      FileSystem.closeAll();
+    }
 
-  public void testCloseHftpFS() throws Exception {
+    {
       Configuration conf = new Configuration();
       new Path("hftp://localhost:12345/").getFileSystem(conf);
       UnixUserGroupInformation.login(conf, true);
       FileSystem.closeAll();
-  }
-
-  public void testCloseHftpFSAltLogin() throws Exception {
-    Configuration conf = new Configuration();
-    FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
-    UnixUserGroupInformation.login(fs.getConf(), true);
-    FileSystem.closeAll();
-  }
-
+    }
 
-  public void testCloseHDFS() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
-      URI uri = cluster.getFileSystem().getUri();
-      FileSystem fs = FileSystem.get(uri, new Configuration());
-      checkPath(cluster, fs);
+    {
       Configuration conf = new Configuration();
-      new Path(uri.toString()).getFileSystem(conf);
-      UnixUserGroupInformation.login(conf, true);
+      FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
+      UnixUserGroupInformation.login(fs.getConf(), true);
       FileSystem.closeAll();
-    } finally {
-      MiniDFSCluster.close(cluster);
     }
   }
 
+
   public void testCacheKeysAreCaseInsensitive()
     throws Exception
   {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Mar 12 19:43:05 2009
@@ -25,14 +25,12 @@
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.io.RandomAccessFile;
-import java.io.Closeable;
 
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -45,18 +43,13 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.util.Service;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * This class creates a single-process DFS cluster for junit testing.
  * The data directories for non-simulated DFS are under the testing directory.
  * For simulated data nodes, no underlying fs storage is used.
  */
-public class MiniDFSCluster implements Closeable {
-  private static final int WAIT_SLEEP_TIME_MILLIS = 500;
-  private static final int STARTUP_TIMEOUT_MILLIS = 30000;
+public class MiniDFSCluster {
 
   public class DataNodeProperties {
     DataNode datanode;
@@ -70,7 +63,6 @@
     }
   }
 
-  private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
   private Configuration conf;
   private NameNode nameNode;
   private int numDataNodes;
@@ -289,18 +281,17 @@
   }
 
   /**
-   * wait for the cluster to get out of
+   * wait for the cluster to get out of 
    * safemode.
    */
   public void waitClusterUp() {
     if (numDataNodes > 0) {
-      try {
-        while (!isClusterUp()) {
-          LOG.warn("Waiting for the Mini HDFS Cluster to start...");
-          Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
+      while (!isClusterUp()) {
+        try {
+          System.err.println("Waiting for the Mini HDFS Cluster to start...");
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
         }
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted during startup", e);
       }
     }
   }
@@ -332,6 +323,7 @@
                              boolean manageDfsDirs, StartupOption operation, 
                              String[] racks, String[] hosts,
                              long[] simulatedCapacities) throws IOException {
+
     int curDatanodesNum = dataNodes.size();
     // for mincluster's the default initialDelay for BRs is 0
     if (conf.get("dfs.blockreport.initialDelay") == null) {
@@ -358,7 +350,7 @@
     }
     //Generate some hostnames if required
     if (racks != null && hosts == null) {
-      LOG.info("Generating host names for datanodes");
+      System.out.println("Generating host names for datanodes");
       hosts = new String[numDataNodes];
       for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
         hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
@@ -401,16 +393,16 @@
         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
             simulatedCapacities[i-curDatanodesNum]);
       }
-      LOG.info("Starting DataNode " + i + " with dfs.data.dir: "
+      System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
                          + dnConf.get("dfs.data.dir"));
       if (hosts != null) {
         dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
-        LOG.info("Starting DataNode " + i + " with hostname set to: "
+        System.out.println("Starting DataNode " + i + " with hostname set to: " 
                            + dnConf.get("slave.host.name"));
       }
       if (racks != null) {
         String name = hosts[i - curDatanodesNum];
-        LOG.info("Adding node with hostname : " + name + " to rack "+
+        System.out.println("Adding node with hostname : " + name + " to rack "+
                             racks[i-curDatanodesNum]);
         StaticMapping.addNodeToRack(name,
                                     racks[i-curDatanodesNum]);
@@ -425,7 +417,7 @@
       String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
       if (racks != null) {
         int port = dn.getSelfAddr().getPort();
-        LOG.info("Adding node with IP:port : " + ipAddr + ":" + port+
+        System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
                             " to rack " + racks[i-curDatanodesNum]);
         StaticMapping.addNodeToRack(ipAddr + ":" + port,
                                   racks[i-curDatanodesNum]);
@@ -556,8 +548,8 @@
   /**
    * Shut down the servers that are up.
    */
-  public synchronized void shutdown() {
-    LOG.info("Shutting down the Mini HDFS Cluster");
+  public void shutdown() {
+    System.out.println("Shutting down the Mini HDFS Cluster");
     shutdownDataNodes();
     if (nameNode != null) {
       nameNode.stop();
@@ -565,57 +557,20 @@
       nameNode = null;
     }
   }
-
-  /**
-   * Shuts down the cluster.  
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  public void close() throws IOException {
-    shutdown();
-  }
-
-  /**
-   * Static operation to shut down a cluster;
-   * harmless if the cluster argument is null
-   *
-   * @param cluster cluster to shut down, or null for no cluster
-   */
-  public static void close(Closeable cluster) {
-    Service.close(cluster);
-  }
-
+  
   /**
    * Shutdown all DataNodes started by this class.  The NameNode
    * is left running so that new DataNodes may be started.
    */
   public void shutdownDataNodes() {
     for (int i = dataNodes.size()-1; i >= 0; i--) {
-      LOG.info("Shutting down DataNode " + i);
+      System.out.println("Shutting down DataNode " + i);
       DataNode dn = dataNodes.remove(i).datanode;
       dn.shutdown();
       numDataNodes--;
     }
   }
 
-  /**
-   * Returns a string representation of the cluster.
-   *
-   * @return a string representation of the cluster
-   */
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append("Cluster up:").append(isClusterUp());
-    builder.append("\nName Node:").append(getNameNode());
-    builder.append("\nData node count:").append(dataNodes.size());
-    for (DataNodeProperties dnp : dataNodes) {
-      builder.append("\n Datanode: ").append(dnp.datanode);
-      builder.append("\n  state: ").append(dnp.datanode.getServiceState());
-    }
-    return builder.toString();
-  }
-
   /*
    * Corrupt a block on all datanode
    */
@@ -640,15 +595,12 @@
       if (blockFile.exists()) {
         // Corrupt replica by writing random bytes into replica
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        try {
-          FileChannel channel = raFile.getChannel();
-          String badString = "BADBAD";
-          int rand = random.nextInt((int) channel.size() / 2);
-          raFile.seek(rand);
-          raFile.write(badString.getBytes());
-        } finally {
-          raFile.close();
-        }
+        FileChannel channel = raFile.getChannel();
+        String badString = "BADBAD";
+        int rand = random.nextInt((int)channel.size()/2);
+        raFile.seek(rand);
+        raFile.write(badString.getBytes());
+        raFile.close();
       }
       corrupted = true;
     }
@@ -664,7 +616,7 @@
     }
     DataNodeProperties dnprop = dataNodes.remove(i);
     DataNode dn = dnprop.datanode;
-    LOG.info("MiniDFSCluster Stopping DataNode " +
+    System.out.println("MiniDFSCluster Stopping DataNode " + 
                        dn.dnRegistration.getName() +
                        " from a total of " + (dataNodes.size() + 1) + 
                        " datanodes.");
@@ -703,10 +655,8 @@
     }
   }
 
-  /**
+  /*
    * Shutdown a datanode by name.
-   * @param name datanode name
-   * @return true if a node was shut down
    */
   public synchronized DataNodeProperties stopDataNode(String name) {
     int i;
@@ -720,7 +670,7 @@
   }
   
   /**
-   * @return true if the NameNode is running and is out of Safe Mode.
+   * Returns true if the NameNode is running and is out of Safe Mode.
    */
   public boolean isClusterUp() {
     if (nameNode == null) {
@@ -735,7 +685,7 @@
   }
   
   /**
-   * @return true if there is at least one DataNode running.
+   * Returns true if there is at least one DataNode running.
    */
   public boolean isDataNodeUp() {
     if (dataNodes == null || dataNodes.size() == 0) {
@@ -746,15 +696,13 @@
   
   /**
    * Get a client handle to the DFS cluster.
-   * @return a new filesystem, which must be closed when no longer needed.
-   * @throws IOException if the filesystem cannot be created
    */
   public FileSystem getFileSystem() throws IOException {
     return FileSystem.get(conf);
   }
 
   /**
-   * @return the directories where the namenode stores its state.
+   * Get the directories where the namenode stores its image.
    */
   public Collection<File> getNameDirs() {
     return FSNamesystem.getNamespaceDirs(conf);
@@ -777,27 +725,17 @@
     InetSocketAddress addr = new InetSocketAddress("localhost",
                                                    getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
-    try {
-      DatanodeInfo[] dnInfos;
 
-      // make sure all datanodes are alive
-      long timeout=System.currentTimeMillis() + STARTUP_TIMEOUT_MILLIS;
-      while((dnInfos = client.datanodeReport(DatanodeReportType.LIVE)).length
-          != numDataNodes) {
-        try {
-          Thread.sleep(WAIT_SLEEP_TIME_MILLIS);
-          if(System.currentTimeMillis() > timeout) {
-            throw new IOException("Timeout waiting for the datanodes. "+
-                    "Expected " + numDataNodes + "but got " + dnInfos.length);
-          }
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted while waiting for the datanodes",e);
-        }
+    // make sure all datanodes are alive
+    while(client.datanodeReport(DatanodeReportType.LIVE).length
+        != numDataNodes) {
+      try {
+        Thread.sleep(500);
+      } catch (Exception e) {
       }
-    } finally {
-      client.close();
     }
 
+    client.close();
   }
   
   public void formatDataNodeDirs() throws IOException {
@@ -886,7 +824,7 @@
   }
 
   /**
-   * @return the current set of datanodes
+   * Returns the current set of datanodes
    */
   DataNode[] listDataNodes() {
     DataNode[] list = new DataNode[dataNodes.size()];

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Thu Mar 12 19:43:05 2009
@@ -58,7 +58,7 @@
     config = new Configuration();
     config.set("dfs.name.dir", new File(hdfsDir, "name1").getPath());
     FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
-    config.set("dfs.http.address", "0.0.0.0:0");
+    config.set("dfs.http.address", NAME_NODE_HTTP_HOST + "0");
     NameNode.format(config);
 
     String[] args = new String[] {};

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java Thu Mar 12 19:43:05 2009
@@ -19,6 +19,7 @@
 
 import junit.framework.TestCase;
 import java.io.*;
+import java.util.Iterator;
 import java.util.Random;
 import java.net.*;
 
@@ -70,7 +71,7 @@
     Configuration conf = fileSys.getConf();
     ClientProtocol namenode = DFSClient.createNamenode(conf);
       
-    waitForBlockReplication(name.toString(), namenode,
+    waitForBlockReplication(name.toString(), namenode, 
                             Math.min(numDatanodes, repl), -1);
     
     LocatedBlocks locations = namenode.getBlockLocations(name.toString(),0,
@@ -168,9 +169,9 @@
     // Now get block details and check if the block is corrupt
     blocks = dfsClient.namenode.
               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-    LOG.info("Waiting until block is marked as corrupt...");
     while (blocks.get(0).isCorrupt() != true) {
       try {
+        LOG.info("Waiting until block is marked as corrupt...");
         Thread.sleep(1000);
       } catch (InterruptedException ie) {
       }
@@ -248,14 +249,16 @@
       boolean replOk = true;
       LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, 
                                                         Long.MAX_VALUE);
-
-      for (LocatedBlock block : blocks.getLocatedBlocks()) {
+      
+      for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
+           iter.hasNext();) {
+        LocatedBlock block = iter.next();
         int actual = block.getLocations().length;
-        if (actual < expected) {
+        if ( actual < expected ) {
           if (true || iters > 0) {
             LOG.info("Not enough replicas for " + block.getBlock() +
-                    " yet. Expecting " + expected + ", got " +
-                    actual + ".");
+                               " yet. Expecting " + expected + ", got " + 
+                               actual + ".");
           }
           replOk = false;
           break;
@@ -382,8 +385,10 @@
       waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
       
     } finally {
-      MiniDFSCluster.close(cluster);
-    }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }  
   }
   
   /**

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Thu Mar 12 19:43:05 2009
@@ -137,11 +137,6 @@
     
     editLog.close();
 
-    //check that the namesystem is still healthy
-    assertNotNull("FSNamesystem.getFSNamesystem()  is null",
-            FSNamesystem.getFSNamesystem());
-    assertNotNull("FSNamesystem.getFSNamesystem().dir is null",
-            FSNamesystem.getFSNamesystem().dir);
     // Verify that we can read in all the transactions that we have written.
     // If there were any corruptions, it is likely that the reading in
     // of these transactions will throw an exception.

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Thu Mar 12 19:43:05 2009
@@ -93,7 +93,7 @@
       config = props;
     }
 
-    private ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
+    public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
                                      int numDir) throws Exception {
       super(numTaskTrackers, namenode, numDir);
     }
@@ -121,10 +121,14 @@
    * @throws Exception if the cluster could not be stopped
    */
   protected void stopCluster() throws Exception {
-    MiniMRCluster.close(mrCluster);
-    mrCluster = null;
-    MiniDFSCluster.close(dfsCluster);
-    dfsCluster = null;
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+      mrCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
   }
 
   /**

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java Thu Mar 12 19:43:05 2009
@@ -163,8 +163,22 @@
    * @throws Exception
    */
   protected void tearDown() throws Exception {
-    MiniMRCluster.close(mrCluster);
-    MiniDFSCluster.close(dfsCluster);
+    try {
+      if (mrCluster != null) {
+        mrCluster.shutdown();
+      }
+    }
+    catch (Exception ex) {
+      System.out.println(ex);
+    }
+    try {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
+    }
+    catch (Exception ex) {
+      System.out.println(ex);
+    }
     super.tearDown();
   }
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Mar 12 19:43:05 2009
@@ -19,7 +19,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -32,13 +31,12 @@
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.Service;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
  * One thread is created for each server.
  */
-public class MiniMRCluster implements Closeable {
+public class MiniMRCluster {
   private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
     
   private Thread jobTrackerThread;
@@ -54,13 +52,9 @@
     
   private String namenode;
   private UnixUserGroupInformation ugi = null;
-  private static final int TRACKER_STABILIZATION_TIMEOUT = 30000;
-
   private JobConf conf;
     
   private JobConf job;
-  /** time for a tracker to shut down : {@value} */
-  private static final long TRACKER_SHUTDOWN_TIMEOUT = 30000;
   
   /**
    * An inner class that runs a job tracker.
@@ -107,7 +101,7 @@
         tracker = JobTracker.startTracker(jc);
         tracker.offerService();
       } catch (Throwable e) {
-        LOG.error("Job tracker crashed: " + e, e);
+        LOG.error("Job tracker crashed", e);
         isActive = false;
       }
     }
@@ -116,12 +110,13 @@
      * Shutdown the job tracker and wait for it to finish.
      */
     public void shutdown() {
-      JobTracker jobTracker;
-      synchronized (this) {
-        jobTracker = tracker;
-        tracker = null;
+      try {
+        if (tracker != null) {
+          tracker.stopTracker();
+        }
+      } catch (Throwable e) {
+        LOG.error("Problem shutting down job tracker", e);
       }
-      Service.close(jobTracker);
       isActive = false;
     }
   }
@@ -182,7 +177,7 @@
       } catch (Throwable e) {
         isDead = true;
         tt = null;
-        LOG.error("task tracker " + trackerId + " crashed : "+e, e);
+        LOG.error("task tracker " + trackerId + " crashed", e);
       }
     }
         
@@ -425,7 +420,7 @@
      
      //Generate rack names if required
      if (racks == null) {
-       LOG.info("Generating rack names for tasktrackers");
+       System.out.println("Generating rack names for tasktrackers");
        racks = new String[numTaskTrackers];
        for (int i=0; i < racks.length; ++i) {
          racks[i] = NetworkTopology.DEFAULT_RACK;
@@ -434,7 +429,7 @@
      
     //Generate some hostnames if required
     if (hosts == null) {
-      LOG.info("Generating host names for tasktrackers");
+      System.out.println("Generating host names for tasktrackers");
       hosts = new String[numTaskTrackers];
       for (int i = 0; i < numTaskTrackers; i++) {
         hosts[i] = "host" + i + ".foo.com";
@@ -475,24 +470,6 @@
     }
     
     this.job = createJobConf(conf);
-    // Wait till the MR cluster stabilizes
-    long timeout = System.currentTimeMillis() +
-            TRACKER_STABILIZATION_TIMEOUT;
-    while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException ie) {
-        throw new IOException("Interrupted during startup");
-      }
-      if(System.currentTimeMillis() > timeout) {
-        String message = "Time out waiting for the task trackers to stabilize. "
-                + "Expected tracker count: " + numTaskTrackers
-                + "  -actual count: "
-                + jobTracker.tracker.getNumResolvedTaskTrackers();
-        LOG.error(message);
-        throw new IOException(message);
-      }
-    }
     waitUntilIdle();
   }
     
@@ -606,11 +583,12 @@
    * Kill the jobtracker.
    */
   public void stopJobTracker() {
+    //jobTracker.exit(-1);
     jobTracker.shutdown();
 
     jobTrackerThread.interrupt();
     try {
-      jobTrackerThread.join(TRACKER_SHUTDOWN_TIMEOUT);
+      jobTrackerThread.join();
     } catch (InterruptedException ex) {
       LOG.error("Problem waiting for job tracker to finish", ex);
     }
@@ -671,25 +649,6 @@
     }
   }
     
-  /**
-   * Static operation to shut down a cluster; harmless if the cluster argument
-   * is null
-   *
-   * @param cluster cluster to shut down, or null for no cluster
-   */
-  public static void close(Closeable cluster) {
-    Service.close(cluster);
-  }
-
-  /**
-   * Shuts down the cluster.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  public void close() throws IOException {
-    shutdown();
-  }
-    
   public static void main(String[] args) throws IOException {
     LOG.info("Bringing up Jobtracker and tasktrackers.");
     MiniMRCluster mr = new MiniMRCluster(4, "file:///", 1);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Thu Mar 12 19:43:05 2009
@@ -35,9 +35,6 @@
  */
 public class TestMRServerPorts extends TestCase {
   TestHDFSServerPorts hdfs = new TestHDFSServerPorts();
-  private static final String STARTED_UNEXPECTEDLY
-          = "the Job tracker should not have started";
-  private static final String FAILED_TO_START = "The Job tracker did not start";
 
   // Runs the JT in a separate thread
   private static class JTRunner extends Thread {
@@ -88,6 +85,7 @@
         return false;
       throw e;
     }
+    jt.fs.close();
     jt.stopTracker();
     return true;
   }
@@ -124,21 +122,21 @@
       conf2.set("mapred.job.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       boolean started = canStartJobTracker(conf2);
-      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
+      assertFalse(started); // should fail
 
       // bind http server to the same port as name-node
       conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
       conf2.set("mapred.job.tracker.http.address",
         hdfs.getConfig().get("dfs.http.address"));
       started = canStartJobTracker(conf2);
-      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
+      assertFalse(started); // should fail again
 
       // both ports are different from the name-node ones
       conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
       conf2.set("mapred.job.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       started = canStartJobTracker(conf2);
-      assertTrue(FAILED_TO_START, started); // should start now
+      assertTrue(started); // should start now
 
     } finally {
       hdfs.stopNameNode(nn);
@@ -165,7 +163,7 @@
       conf2.set("mapred.task.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       boolean started = canStartTaskTracker(conf2);
-      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail
+      assertFalse(started); // should fail
 
       // bind http server to the same port as name-node
       conf2.set("mapred.task.tracker.report.address",
@@ -173,7 +171,7 @@
       conf2.set("mapred.task.tracker.http.address",
         hdfs.getConfig().get("dfs.http.address"));
       started = canStartTaskTracker(conf2);
-      assertFalse(STARTED_UNEXPECTEDLY, started); // should fail again
+      assertFalse(started); // should fail again
 
       // both ports are different from the name-node ones
       conf2.set("mapred.task.tracker.report.address",
@@ -181,7 +179,7 @@
       conf2.set("mapred.task.tracker.http.address",
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       started = canStartTaskTracker(conf2);
-      assertTrue(FAILED_TO_START, started); // should start now
+      assertTrue(started); // should start now
     } finally {
       if (jt != null) {
         jt.fs.close();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Thu Mar 12 19:43:05 2009
@@ -46,12 +46,7 @@
   private MiniMRCluster mr;
   private MiniDFSCluster dfs;
   private FileSystem fileSys;
-  private static final String BAILING_OUT = "Bailing out";
-  private static final String TEST_SCRIPT_BAILING_OUT
-          = "Test Script\n"+ BAILING_OUT;
-  private static final int SCRIPT_SLEEP_TIMEOUT = 60000;
-  private static final int SCRIPT_SLEEP_INTERVAL = 1000;
-
+  
   /**
    * Fail map class 
    */
@@ -60,7 +55,7 @@
      public void map (LongWritable key, Text value, 
                      OutputCollector<Text, IntWritable> output, 
                      Reporter reporter) throws IOException {
-       System.err.println(BAILING_OUT);
+       System.err.println("Bailing out");
        throw new IOException();
      }
   }
@@ -170,17 +165,7 @@
     // construct the task id of first map task of failmap
     TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0);
     // wait for the job to finish.
-    long timeout = System.currentTimeMillis() + SCRIPT_SLEEP_TIMEOUT;
-    while (!job.isComplete()) {
-      try {
-          Thread.sleep(SCRIPT_SLEEP_INTERVAL);
-        } catch (InterruptedException e) {
-          fail("Interrupted");
-        }
-        if(System.currentTimeMillis() > timeout) {
-          fail("Timeout waiting for the job to complete ");
-      }
-    }
+    while (!job.isComplete()) ;
     
     // return the output of debugout log.
     return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId);
@@ -219,9 +204,7 @@
                                outDir,debugDir, debugScript, input);
       
       // Assert the output of debug script.
-      if(!result.contains(TEST_SCRIPT_BAILING_OUT)) {
-        fail("Did not find " + TEST_SCRIPT_BAILING_OUT + "in \n" + result);
-      }
+      assertEquals("Test Script\nBailing out", result);
 
     } finally {  
       // close file system and shut down dfs and mapred cluster

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Thu Mar 12 19:43:05 2009
@@ -112,11 +112,13 @@
        */
       TestRackAwareTaskPlacement.launchJobAndTestCounters(
     		  testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
+      mr.shutdown();
     } finally {
       fileSys.delete(inDir, true);
       fileSys.delete(outputPath, true);
-      MiniMRCluster.close(mr);
-      MiniDFSCluster.close(dfs);
+      if (dfs != null) { 
+        dfs.shutdown(); 
+      }
     }
   }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Mar 12 19:43:05 2009
@@ -151,11 +151,14 @@
       launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
                                0, 3);
       mr.shutdown();
-      mr=null;
       
     } finally {
-      MiniDFSCluster.close(dfs);
-      MiniMRCluster.close(mr);
+      if (dfs != null) { 
+        dfs.shutdown(); 
+      }
+      if (mr != null) { 
+        mr.shutdown();
+      }
     }
   }
   static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath, 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Thu Mar 12 19:43:05 2009
@@ -247,13 +247,8 @@
       // they were running on a lost tracker
       testSetupAndCleanupKill(mr, dfs, false);
     } finally {
-      try {
-        if (dfs != null) { dfs.shutdown(); }
-        if (mr != null) { mr.shutdown();
-        }
-      } catch (OutOfMemoryError e) {
-        //ignore this as it means something went very wrong in the test logging
-        //any attempt to log it may make things worse
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
       }
     }
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Thu Mar 12 19:43:05 2009
@@ -81,8 +81,8 @@
       runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
       mr.waitUntilIdle();
     } finally {
-      MiniMRCluster.close(mr);
-      MiniDFSCluster.close(dfs);
+      mr.shutdown();
+      dfs.shutdown();
     }
   }