You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/09/19 09:31:42 UTC

svn commit: r696957 [2/2] - in /hadoop/core/trunk: ./ conf/ src/core/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 19 00:31:41 2008
@@ -18,6 +18,9 @@
  package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -32,11 +35,13 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.LinkedHashMap;
 import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +67,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -184,6 +190,8 @@
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
+  volatile JvmManager jvmManager;
+  
   private TaskMemoryManagerThread taskMemoryManager;
   private boolean taskMemoryManagerEnabled = false;
   private long maxVirtualMemoryForTasks 
@@ -389,7 +397,7 @@
 
     // Clear out state tables
     this.tasks.clear();
-    this.runningTasks = new TreeMap<TaskAttemptID, TaskInProgress>();
+    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.runningJobs = new TreeMap<JobID, RunningJob>();
     this.mapTotal = 0;
     this.reduceTotal = 0;
@@ -422,6 +430,8 @@
     InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
     String bindAddress = socAddr.getHostName();
     int tmpPort = socAddr.getPort();
+    
+    this.jvmManager = new JvmManager(this);
 
     // RPC initialization
     int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
@@ -472,6 +482,10 @@
       taskMemoryManager.setDaemon(true);
       taskMemoryManager.start();
     }
+    mapLauncher = new TaskLauncher(maxCurrentMapTasks);
+    reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
+    mapLauncher.start();
+    reduceLauncher.start();
     this.running = true;
   }
   
@@ -675,7 +689,6 @@
   private void localizeJob(TaskInProgress tip) throws IOException {
     Path localJarFile = null;
     Task t = tip.getTask();
-    
     JobID jobId = t.getJobID();
     Path jobFile = new Path(t.getJobFile());
     // Get sizes of JobFile and JarFile
@@ -840,6 +853,14 @@
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
     
+    //stop the launchers
+    mapLauncher.cleanTaskQueue();
+    reduceLauncher.cleanTaskQueue();
+    this.mapLauncher.interrupt();
+    this.reduceLauncher.interrupt();
+    
+    jvmManager.stop();
+    
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
   }
@@ -1037,7 +1058,7 @@
         if (actions != null){ 
           for(TaskTrackerAction action: actions) {
             if (action instanceof LaunchTaskAction) {
-              startNewTask((LaunchTaskAction) action);
+              addToTaskQueue((LaunchTaskAction)action);
             } else if (action instanceof CommitTaskAction) {
               CommitTaskAction commitAction = (CommitTaskAction)action;
               if (!commitResponses.contains(commitAction.getTaskID())) {
@@ -1130,8 +1151,8 @@
     boolean askForNewTask;
     long localMinSpaceStart;
     synchronized (this) {
-      askForNewTask = (mapTotal < maxCurrentMapTasks || 
-                       reduceTotal < maxCurrentReduceTasks) &&
+      askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || 
+                       status.countReduceTasks() < maxCurrentReduceTasks) &&
                       acceptNewTasks; 
       localMinSpaceStart = minSpaceStart;
     }
@@ -1161,6 +1182,8 @@
     synchronized (this) {
       for (TaskStatus taskStatus : status.getTaskReports()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+            taskStatus.getRunState() != TaskStatus.State.INITIALIZED &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
@@ -1192,7 +1215,7 @@
   /**
    * Return the maximum amount of memory available for all tasks on 
    * this tracker
-   * @return maximum amount of virtual memory in kilobytes
+   * @return maximum amount of virtual memory
    */
   long getMaxVirtualMemoryForTasks() {
     return maxVirtualMemoryForTasks;
@@ -1208,7 +1231,7 @@
    * and the total amount of maximum virtual memory that can be
    * used by all currently running tasks.
    * 
-   * @return amount of free virtual memory in kilobytes that can be assured for
+   * @return amount of free virtual memory that can be assured for
    * new tasks
    */
   private synchronized long findFreeVirtualMemory() {
@@ -1224,9 +1247,9 @@
       // still occupied and hence memory of the task should be
       // accounted in used memory.
       if ((tip.getRunState() == TaskStatus.State.RUNNING)
-            || (tip.getRunState() == TaskStatus.State.UNASSIGNED)
+            || (tip.getRunState() == TaskStatus.State.INITIALIZED)
             || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        maxMemoryUsed += getMemoryForTask(tip);
+        maxMemoryUsed += getMemoryForTask(tip.getJobConf());
       }
     }
   
@@ -1239,11 +1262,11 @@
    * If the TIP's job has a configured value for the max memory that is
    * returned. Else, the default memory that would be assigned for the
    * task is returned.
-   * @param tip The TaskInProgress
+   * @param conf
    * @return the memory allocated for the TIP.
    */
-  private long getMemoryForTask(TaskInProgress tip) {
-    long memForTask = tip.getJobConf().getMaxVirtualMemoryForTask();
+  public long getMemoryForTask(JobConf conf) {
+    long memForTask = conf.getMaxVirtualMemoryForTask();
     if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
       memForTask = this.getDefaultMemoryPerTask();
     }
@@ -1278,6 +1301,7 @@
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
       if (tip.getRunState() == TaskStatus.State.RUNNING ||
+          tip.getRunState() == TaskStatus.State.INITIALIZED ||
           tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
@@ -1497,15 +1521,101 @@
       return -1;
     }
   }
+
+  private TaskLauncher mapLauncher;
+  private TaskLauncher reduceLauncher;
+      
+  public JvmManager getJvmManagerInstance() {
+    return jvmManager;
+  }
+
+  public void addFreeMapSlot() {
+    mapLauncher.addFreeSlot();
+  }
   
-  /**
-   * Start a new task.
-   * All exceptions are handled locally, so that we don't mess up the
-   * task tracker.
-   */
-  private void startNewTask(LaunchTaskAction action) {
+  public void addFreeReduceSlot() {
+    reduceLauncher.addFreeSlot();
+  }
+  
+  private void addToTaskQueue(LaunchTaskAction action) {
+    if (action.getTask().isMapTask()) {
+      mapLauncher.addToTaskQueue(action);
+    } else {
+      reduceLauncher.addToTaskQueue(action);
+    }
+  }
+  
+  private class TaskLauncher extends Thread {
+    private IntWritable numFreeSlots;
+    private final int maxSlots;
+    private List<TaskInProgress> tasksToLaunch;
+
+    public TaskLauncher(int numSlots) {
+      this.maxSlots = numSlots;
+      this.numFreeSlots = new IntWritable(numSlots);
+      this.tasksToLaunch = new LinkedList<TaskInProgress>();
+      setDaemon(true);
+      setName("TaskLauncher for task");
+    }
+
+    public void addToTaskQueue(LaunchTaskAction action) {
+      synchronized (tasksToLaunch) {
+        TaskInProgress tip = registerTask(action);
+        tasksToLaunch.add(tip);
+        tasksToLaunch.notifyAll();
+      }
+    }
+    
+    public void cleanTaskQueue() {
+      tasksToLaunch.clear();
+    }
+    
+    public void addFreeSlot() {
+      synchronized (numFreeSlots) {
+        numFreeSlots.set(numFreeSlots.get() + 1);
+        assert (numFreeSlots.get() <= maxSlots);
+        LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
+        numFreeSlots.notifyAll();
+      }
+    }
+    
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          TaskInProgress tip;
+          synchronized (tasksToLaunch) {
+            while (tasksToLaunch.isEmpty()) {
+              tasksToLaunch.wait();
+            }
+            //get the TIP
+            tip = tasksToLaunch.remove(0);
+            LOG.info("Trying to launch : " + tip.getTask().getTaskID());
+          }
+          //wait for a slot to run
+          synchronized (numFreeSlots) {
+            while (numFreeSlots.get() == 0) {
+              numFreeSlots.wait();
+            }
+            LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
+                " and trying to launch "+tip.getTask().getTaskID());
+            numFreeSlots.set(numFreeSlots.get() - 1);
+            assert (numFreeSlots.get() >= 0);
+          }
+          
+          //got a free slot. launch the task
+          startNewTask(tip);
+        } catch (InterruptedException e) { 
+          return; // ALL DONE
+        } catch (Throwable th) {
+          LOG.error("TaskLauncher error " + 
+              StringUtils.stringifyException(th));
+        }
+      }
+    }
+  }
+  private TaskInProgress registerTask(LaunchTaskAction action) {
     Task t = action.getTask();
-    LOG.info("LaunchTaskAction: " + t.getTaskID());
+    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
     TaskInProgress tip = new TaskInProgress(t, this.fConf);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
@@ -1517,10 +1627,19 @@
         reduceTotal++;
       }
     }
+    return tip;
+  }
+  /**
+   * Start a new task.
+   * All exceptions are handled locally, so that we don't mess up the
+   * task tracker.
+   */
+  private void startNewTask(TaskInProgress tip) {
     try {
       localizeJob(tip);
       if (isTaskMemoryManagerEnabled()) {
-        taskMemoryManager.addTask(t.getTaskID(), getMemoryForTask(tip));
+        taskMemoryManager.addTask(tip.getTask().getTaskID(), 
+            getMemoryForTask(tip.getJobConf()));
       }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
@@ -1691,13 +1810,11 @@
         }
         localJobConf.set("hadoop.net.static.resolutions", str.toString());
       }
-      OutputStream out = localFs.create(localTaskFile);
-      try {
-        localJobConf.write(out);
-      } finally {
-        out.close();
+      if (task.isMapTask()) {
+        debugCommand = localJobConf.getMapDebugScript();
+      } else {
+        debugCommand = localJobConf.getReduceDebugScript();
       }
-      task.setConf(localJobConf);
       String keepPattern = localJobConf.getKeepTaskFilesPattern();
       if (keepPattern != null) {
         alwaysKeepTaskFiles = 
@@ -1705,11 +1822,21 @@
       } else {
         alwaysKeepTaskFiles = false;
       }
-      if (task.isMapTask()) {
-        debugCommand = localJobConf.getMapDebugScript();
-      } else {
-        debugCommand = localJobConf.getReduceDebugScript();
+      if (debugCommand != null || localJobConf.getProfileEnabled() ||
+          alwaysKeepTaskFiles) {
+        //disable jvm reuse
+        localJobConf.setNumTasksToExecutePerJvm(1);
       }
+      if (isTaskMemoryManagerEnabled()) {
+        localJobConf.setBoolean("task.memory.mgmt.enabled", true);
+      }
+      OutputStream out = localFs.create(localTaskFile);
+      try {
+        localJobConf.write(out);
+      } finally {
+        out.close();
+      }
+      task.setConf(localJobConf);
     }
         
     /**
@@ -1717,6 +1844,10 @@
     public Task getTask() {
       return task;
     }
+    
+    public TaskRunner getTaskRunner() {
+      return runner;
+    }
 
     public synchronized void setJobConf(JobConf lconf){
       this.localJobConf = lconf;
@@ -1745,10 +1876,8 @@
      */
     public synchronized void launchTask() throws IOException {
       localizeTask(task);
-      this.taskStatus.setRunState(TaskStatus.State.RUNNING);
       this.runner = task.createRunner(TaskTracker.this);
       this.runner.start();
-      this.taskStatus.setStartTime(System.currentTimeMillis());
     }
 
     /**
@@ -1816,7 +1945,8 @@
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
-      
+      jvmManager.taskFinished(runner);
+      runner.signalDone();
       LOG.info("Task " + task.getTaskID() + " is done.");
       LOG.info("reported output size for " + task.getTaskID() +  "  was " + taskStatus.getOutputSize());
 
@@ -1857,13 +1987,16 @@
               String jobConf = task.getJobFile();
               try {
                 // get task's stdout file 
-                taskStdout = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                taskStdout = FileUtil.makeShellPath(
+                    TaskLog.getRealTaskLogFileLocation
                                   (task.getTaskID(), TaskLog.LogName.STDOUT));
                 // get task's stderr file 
-                taskStderr = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                taskStderr = FileUtil.makeShellPath(
+                    TaskLog.getRealTaskLogFileLocation
                                   (task.getTaskID(), TaskLog.LogName.STDERR));
                 // get task's syslog file 
-                taskSyslog = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                taskSyslog = FileUtil.makeShellPath(
+                    TaskLog.getRealTaskLogFileLocation
                                   (task.getTaskID(), TaskLog.LogName.SYSLOG));
               } catch(IOException e){
                 LOG.warn("Exception finding task's stdout/err/syslog files");
@@ -1882,8 +2015,8 @@
                           StringUtils.stringifyException(e));
               }
               // Build the command  
-              File stdout = TaskLog.getTaskLogFile(task.getTaskID(),
-                                                   TaskLog.LogName.DEBUGOUT);
+              File stdout = TaskLog.getRealTaskLogFileLocation(
+                                   task.getTaskID(), TaskLog.LogName.DEBUGOUT);
               // add pipes program as argument if it exists.
               String program ="";
               String executable = Submitter.getExecutable(localJobConf);
@@ -1951,6 +2084,13 @@
       }
 
     }
+    
+    synchronized void taskInitialized() {
+      if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+        //one-way state change to INITIALIZED
+        this.taskStatus.setRunState(TaskStatus.State.INITIALIZED);
+      }
+    }
   
 
     /**
@@ -2042,6 +2182,8 @@
       // Kill the task if it is still running
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
+            getRunState() == TaskStatus.State.UNASSIGNED ||
+            getRunState() == TaskStatus.State.INITIALIZED ||
             getRunState() == TaskStatus.State.COMMIT_PENDING) {
           kill(wasFailure);
         }
@@ -2057,12 +2199,12 @@
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
+          taskStatus.getRunState() == TaskStatus.State.INITIALIZED ||
           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
-        runner.kill();
         taskStatus.setRunState((wasFailure) ? 
                                   TaskStatus.State.FAILED : 
                                   TaskStatus.State.KILLED);
@@ -2074,6 +2216,16 @@
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      if (runner != null) {
+        runner.kill();
+        runner.signalDone();
+      } else {
+        if (task.isMapTask()) {
+          addFreeMapSlot();
+        } else {
+          addFreeReduceSlot();
+        }
+      }
     }
 
     /**
@@ -2112,13 +2264,6 @@
       TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
 
-      // Remove the associated pid-file, if any
-      if (TaskTracker.this.isTaskMemoryManagerEnabled()) {
-        Path pidFilePath = taskMemoryManager.getPidFilePath(taskId);
-        if (pidFilePath != null) {
-          directoryCleanupThread.addToQueue(pidFilePath);
-        }
-      }
 
       synchronized (TaskTracker.this) {
         if (needCleanup) {
@@ -2138,13 +2283,28 @@
                            + task.getJobID() + Path.SEPARATOR + taskId;
           if (needCleanup) {
             if (runner != null) {
+              //cleans up the output directory of the task (where map outputs 
+              //and reduce inputs get stored)
               runner.close();
             }
-            directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
-                                                            taskDir));
+            //We don't delete the workdir
+            //since some other task (running in the same JVM) 
+            //might be using the dir. The JVM running the tasks would clean
+            //the workdir per a task in the task process itself.
+            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                  taskDir));
+            }  
+            
+            else {
+              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                taskDir+"/job.xml"));
+            }
           } else {
-            directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
-                           taskDir + Path.SEPARATOR + MRConstants.WORKDIR));
+            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                  taskDir+"/work"));
+            }  
           }
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: " + 
@@ -2170,16 +2330,54 @@
   // ///////////////////////////////////////////////////////////////
   // TaskUmbilicalProtocol
   /////////////////////////////////////////////////////////////////
+
   /**
    * Called upon startup by the child process, to fetch Task data.
    */
-  public synchronized Task getTask(TaskAttemptID taskid) throws IOException {
-    TaskInProgress tip = tasks.get(taskid);
-    if (tip != null) {
-      return tip.getTask();
-    } else {
-      return null;
+  public synchronized JvmTask getTask(JVMId jvmId, TaskAttemptID firstTaskId) 
+  throws IOException {
+    LOG.debug("JVM with ID : " + jvmId + " asked for a task");
+    if (!jvmManager.isJvmKnown(jvmId)) {
+      LOG.info("Killing unknown JVM " + jvmId);
+      return new JvmTask(null, true);
+    }
+    RunningJob rjob = runningJobs.get(jvmId.getJobId());
+    if (rjob == null) { //kill the JVM since the job is dead
+      jvmManager.killJvm(jvmId);
+      return new JvmTask(null, true);
+    }
+    TaskInProgress t = runningTasks.get(firstTaskId);
+    //if we can give the JVM the task it is asking for, well and good;
+    //if not, we give it some other task from the same job (note that some
+    //other JVM might have run this task while this JVM was init'ing)
+    if (t == null || t.getStatus().getRunState() != 
+                     TaskStatus.State.INITIALIZED) {
+      boolean isMap = jvmId.isMapJVM();
+      synchronized (rjob) {
+        for (TaskInProgress tip : runningTasks.values()) {
+          synchronized (tip) {
+            if (tip.getTask().getJobID().equals(jvmId.getJobId()) &&
+                tip.getRunState() == TaskStatus.State.INITIALIZED
+                && ((isMap && tip.getTask().isMapTask()) ||
+                    (!isMap && !tip.getTask().isMapTask()))) {
+              t = tip;
+            }
+          }
+        }
+      }
     }
+    //now the task could be null or we could have got a task that already
+    //ran earlier (the firstTaskId case)
+    if (t == null || t.getRunState() != TaskStatus.State.INITIALIZED) {
+      jvmManager.setRunningTaskForJvm(jvmId, null);  
+      return new JvmTask(null, false);
+    }
+    t.getStatus().setRunState(TaskStatus.State.RUNNING);
+    t.getStatus().setStartTime(System.currentTimeMillis());
+    jvmManager.setRunningTaskForJvm(jvmId,t.getTaskRunner());
+    LOG.info("JVM with ID: " + jvmId + " given task: " + 
+        t.getTask().getTaskID().toString());
+    return new JvmTask(t.getTask(), false);
   }
 
   /**
@@ -2334,6 +2532,13 @@
       taskMemoryManager.removeTask(taskid);
     }
   }
+  
+  synchronized void taskInitialized(TaskAttemptID taskid) {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      tip.taskInitialized();
+    }
+  }
 
   /**
    * A completed map task's output has been lost.
@@ -2355,7 +2560,7 @@
     private JobID jobid; 
     private Path jobFile;
     // keep this for later use
-    Set<TaskInProgress> tasks;
+    volatile Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
@@ -2384,61 +2589,6 @@
     }
   }
 
-  /** 
-   * The main() for child processes. 
-   */
-  public static class Child {
-    
-    public static void main(String[] args) throws Throwable {
-      //LogFactory.showTime(false);
-      LOG.debug("Child starting");
-
-      JobConf defaultConf = new JobConf();
-      String host = args[0];
-      int port = Integer.parseInt(args[1]);
-      InetSocketAddress address = new InetSocketAddress(host, port);
-      TaskAttemptID taskid = TaskAttemptID.forName(args[2]);
-      TaskUmbilicalProtocol umbilical =
-        (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-                                            TaskUmbilicalProtocol.versionID,
-                                            address,
-                                            defaultConf);
-            
-      Task task = umbilical.getTask(taskid);
-      JobConf job = new JobConf(task.getJobFile());
-      TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
-      task.setConf(job);
-          
-      defaultConf.addResource(new Path(task.getJobFile()));
-      
-      // Initiate Java VM metrics
-      JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
-
-      try {
-        // use job-specified working directory
-        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
-        task.run(job, umbilical);             // run the task
-      } catch (FSError e) {
-        LOG.fatal("FSError from child", e);
-        umbilical.fsError(taskid, e.getMessage());
-      } catch (Throwable throwable) {
-        LOG.warn("Error running child", throwable);
-        // Report back any failures, for diagnostic purposes
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        throwable.printStackTrace(new PrintStream(baos));
-        umbilical.reportDiagnosticInfo(taskid, baos.toString());
-      } finally {
-        RPC.stopProxy(umbilical);
-        MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-        metricsContext.close();
-        // Shutting down log4j of the child-vm... 
-        // This assumes that on return from Task.run() 
-        // there is no more logging done.
-        LogManager.shutdown();
-      }
-    }
-  }
-
   /**
    * Get the name for this task tracker.
    * @return the string like "tracker_mymachine:50010"
@@ -2804,9 +2954,13 @@
    * Is the TaskMemoryManager Enabled on this system?
    * @return true if enabled, false otherwise.
    */
-  boolean isTaskMemoryManagerEnabled() {
+  public boolean isTaskMemoryManagerEnabled() {
     return taskMemoryManagerEnabled;
   }
+  
+  public TaskMemoryManagerThread getTaskMemoryManager() {
+    return taskMemoryManager;
+  }
 
   private void setTaskMemoryManagerEnabledFlag() {
     if (!ProcfsBasedProcessTree.isAvailable()) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Sep 19 00:31:41 2008
@@ -208,7 +208,7 @@
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
            (state == TaskStatus.State.UNASSIGNED) ||
-           (state == TaskStatus.State.COMMIT_PENDING))) {
+           (state == TaskStatus.State.INITIALIZED))) {
         mapCount++;
       }
     }
@@ -226,7 +226,7 @@
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
            (state == TaskStatus.State.UNASSIGNED) ||
-           (state == TaskStatus.State.COMMIT_PENDING))) {
+           (state == TaskStatus.State.INITIALIZED))) {
         reduceCount++;
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Sep 19 00:31:41 2008
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.JvmTask;
 
 /** Protocol that task child process uses to contact its parent process.  The
  * parent is a daemon which which polls the central master for a new map or
@@ -49,12 +50,19 @@
    * Version 12 getMapCompletionEvents() now also indicates if the events are 
    *            stale or not. Hence the return type is a class that 
    *            encapsulates the events and whether to reset events index.
+   * Version 13 changed the getTask method signature for HADOOP-249
    * */
 
-  public static final long versionID = 11L;
+  public static final long versionID = 13L;
   
-  /** Called when a child task process starts, to get its task.*/
-  Task getTask(TaskAttemptID taskid) throws IOException;
+  /**
+   * Called when a child task process starts, to get its task.
+   * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
+   * @param taskid the first taskid that the JVM runs
+   * @return Task object
+   * @throws IOException 
+   */
+  JvmTask getTask(JVMId jvmId, TaskAttemptID taskid) throws IOException;
 
   /**
    * Report child's progress to parent.

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Fri Sep 19 00:31:41 2008
@@ -72,14 +72,12 @@
     // Run Sort-Validator
     assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
   }
-
+  Configuration conf = new Configuration();
   public void testMapReduceSort() throws Exception {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     FileSystem fileSys = null;
     try {
-      Configuration conf = new Configuration();
-
       // set io.sort.mb and fsinmemory.size.mb to lower value in test
       conf.setInt("io.sort.mb", 5);
       conf.setInt("fs.inmemory.size.mb", 20);
@@ -103,5 +101,8 @@
       }
     }
   }
-  
+  public void testMapReduceSortWithJvmReuse() throws Exception {
+    conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
+    testMapReduceSort();
+  }
 }