You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/10/01 01:15:01 UTC

svn commit: r700628 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/

Author: acmurthy
Date: Tue Sep 30 16:15:00 2008
New Revision: 700628

URL: http://svn.apache.org/viewvc?rev=700628&view=rev
Log:
HADOOP-4232. Fix race condition in JVM reuse when multiple slots become free. Contributed by Devaraj Das.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 30 16:15:00 2008
@@ -813,6 +813,9 @@
 
     HADOOP-4309. Fix eclipse-plugin compilation. (cdouglas)
 
+    HADOOP-4232. Fix race condition in JVM reuse when multiple slots become
+    free. (ddas via acmurthy) 
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Tue Sep 30 16:15:00 2008
@@ -56,11 +56,10 @@
     int port = Integer.parseInt(args[1]);
     InetSocketAddress address = new InetSocketAddress(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+    final int SLEEP_LONGER_COUNT = 5;
     taskid = firstTaskid;
     int jvmIdInt = Integer.parseInt(args[3]);
     JVMId jvmId = new JVMId(taskid.getJobID(),taskid.isMap(),jvmIdInt);
-    final int MAX_SLEEP_COUNT = 600; //max idle time of 5 minutes
-    int sleepCount = 0;
     TaskUmbilicalProtocol umbilical =
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
           TaskUmbilicalProtocol.versionID,
@@ -75,7 +74,9 @@
         while (true) {
           try {
             Thread.sleep(5000);
-            TaskLog.syncLogs(firstTaskid, taskid);
+            if (taskid != null) {
+              TaskLog.syncLogs(firstTaskid, taskid);
+            }
           } catch (InterruptedException ie) {
           } catch (IOException iee) {
             LOG.error("Error in syncLogs: " + iee);
@@ -93,22 +94,26 @@
     //manager to use JVMId instead of TaskAttemptId
     Path srcPidPath = null;
     Path dstPidPath = null;
+    int idleLoopCount = 0;
     try {
       while (true) {
-        JvmTask myTask = umbilical.getTask(jvmId, firstTaskid);
+        JvmTask myTask = umbilical.getTask(jvmId);
         if (myTask.shouldDie()) {
           break;
         } else {
           if (myTask.getTask() == null) {
-            if (sleepCount == MAX_SLEEP_COUNT) {
-              System.exit(0);
+            taskid = null;
+            if (++idleLoopCount >= SLEEP_LONGER_COUNT) {
+              //we sleep for a bigger interval when we don't receive
+              //tasks for a while
+              Thread.sleep(1500);
+            } else {
+              Thread.sleep(500);
             }
-            sleepCount++;
-            Thread.sleep(500);
             continue;
           }
-          sleepCount = 0; //got a task. reset the sleepCount
         }
+        idleLoopCount = 0;
         Task task = myTask.getTask();
         taskid = task.getTaskID();
         

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Sep 30 16:15:00 2008
@@ -48,10 +48,12 @@
    * Version 16: adds ResourceStatus to TaskTrackerStatus for HADOOP-3759
    * Version 17: Changed format of Task and TaskStatus for HADOOP-3150
    * Version 18: Changed status message due to changes in TaskStatus
-   *             Changed heartbeat to piggyback JobTracker restart information
+   * Version 19: Changed heartbeat to piggyback JobTracker restart information
                  so that the TaskTracker can synchronize itself.
+   * Version 20: Changed status message due to changes in TaskStatus
+   *             (HADOOP-4232)
    */
-  public static final long versionID = 18L;
+  public static final long versionID = 20L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Tue Sep 30 16:15:00 2008
@@ -59,7 +59,7 @@
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
-    public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) throws IOException {
+    public JvmTask getTask(JVMId jvmId) throws IOException {
       return null;
     }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Sep 30 16:15:00 2008
@@ -1433,8 +1433,7 @@
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
-            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
-            taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
         }
@@ -1444,8 +1443,7 @@
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
-            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
-            taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
         }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Tue Sep 30 16:15:00 2008
@@ -29,9 +29,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 class JvmManager {
@@ -72,22 +71,21 @@
     }
   }
 
-  public void launchJvm(JobID jobId, boolean isMap, JvmEnv env) {
-    if (isMap) {
-      mapJvmManager.reapJvm(env, jobId, tracker);
+  public void launchJvm(TaskRunner t, JvmEnv env) {
+    if (t.getTask().isMapTask()) {
+      mapJvmManager.reapJvm(t, tracker, env);
     } else {
-      reduceJvmManager.reapJvm(env, jobId, tracker);
+      reduceJvmManager.reapJvm(t, tracker, env);
     }
   }
 
-  public void setRunningTaskForJvm(JVMId jvmId, TaskRunner t) {
+  public TaskInProgress getTaskForJvm(JVMId jvmId) {
     if (jvmId.isMapJVM()) {
-      mapJvmManager.setRunningTaskForJvm(jvmId, t);
+      return mapJvmManager.getTaskForJvm(jvmId);
     } else {
-      reduceJvmManager.setRunningTaskForJvm(jvmId, t);
+      return reduceJvmManager.getTaskForJvm(jvmId);
     }
   }
-
   public void taskFinished(TaskRunner tr) {
     if (tr.getTask().isMapTask()) {
       mapJvmManager.taskFinished(tr);
@@ -136,17 +134,18 @@
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
         TaskRunner t) {
-      if (t == null) { 
-        //signifies the JVM asked for a task and it 
-        //was not given anything.
-        jvmIdToRunner.get(jvmId).setBusy(false);
-        return;
-      }
       jvmToRunningTask.put(jvmId, t);
       runningTaskToJvm.put(t,jvmId);
       jvmIdToRunner.get(jvmId).setBusy(true);
     }
     
+    synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
+      if (jvmToRunningTask.containsKey(jvmId)) {
+        return jvmToRunningTask.get(jvmId).getTaskInProgress();
+      }
+      return null;
+    }
+    
     synchronized public boolean isJvmknown(JVMId jvmId) {
       return jvmIdToRunner.containsKey(jvmId);
     }
@@ -187,23 +186,34 @@
       jvmIdToRunner.remove(jvmId);
     }
     private synchronized void reapJvm( 
-        JvmEnv env,
-        JobID jobId, TaskTracker tracker) {
+        TaskRunner t, TaskTracker tracker, JvmEnv env) {
       boolean spawnNewJvm = false;
+      JobID jobId = t.getTask().getJobID();
       //Check whether there is a free slot to start a new JVM.
       //,or, Kill a (idle) JVM and launch a new one
+      //When this method is called, we *must* 
+      // (1) spawn a new JVM (if we are below the max) 
+      // (2) find an idle JVM (that belongs to the same job), or,
+      // (3) kill an idle JVM (from a different job) 
+      // (the order of return is in the order above)
       int numJvmsSpawned = jvmIdToRunner.size();
-
+      JvmRunner runnerToKill = null;
       if (numJvmsSpawned >= maxJvms) {
         //go through the list of JVMs for all jobs.
-        //for each JVM see whether it is currently running something and
-        //if not, then kill the JVM
         Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 
           jvmIdToRunner.entrySet().iterator();
         
         while (jvmIter.hasNext()) {
           JvmRunner jvmRunner = jvmIter.next().getValue();
           JobID jId = jvmRunner.jvmId.getJobId();
+          //look for a free JVM for this job; if one exists then just break
+          if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
+            setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
+            LOG.info("No new JVM spawned for jobId/taskid: " + 
+                     jobId+"/"+t.getTask().getTaskID() +
+                     ". Attempting to reuse: " + jvmRunner.jvmId);
+            return;
+          }
           //Cases when a JVM is killed: 
           // (1) the JVM under consideration belongs to the same job 
           //     (passed in the argument). In this case, kill only when
@@ -211,13 +221,12 @@
           //     of count).
           // (2) the JVM under consideration belongs to a different job and is
           //     currently not busy
-          //             
+          //But in both the above cases, we see if we can assign the current
+          //task to an idle JVM (hence we continue the loop even on a match)
           if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
               (!jId.equals(jobId) && !jvmRunner.isBusy())) {
-            jvmIter.remove();
-            jvmRunner.kill();
+            runnerToKill = jvmRunner;
             spawnNewJvm = true;
-            break;
           }
         }
       } else {
@@ -225,13 +234,42 @@
       }
 
       if (spawnNewJvm) {
-        spawnNewJvm(jobId, env, tracker);
-      } else {
-        LOG.info("No new JVM spawned for jobId: " + jobId);
+        if (runnerToKill != null) {
+          LOG.info("Killing JVM: " + runnerToKill.jvmId);
+          runnerToKill.kill();
+        }
+        spawnNewJvm(jobId, env, tracker, t);
+        return;
       }
+      //*MUST* never reach this
+      throw new RuntimeException("Inconsistent state!!! " +
+      		"JVM Manager reached an unstable state " +
+            "while reaping a JVM for task: " + t.getTask().getTaskID()+
+            " " + getDetails());
+    }
+    
+    private String getDetails() {
+      StringBuffer details = new StringBuffer();
+      details.append("Number of active JVMs:").
+              append(jvmIdToRunner.size());
+      Iterator<JVMId> jvmIter = 
+        jvmIdToRunner.keySet().iterator();
+      while (jvmIter.hasNext()) {
+        JVMId jvmId = jvmIter.next();
+        details.append("\n  JVMId ").
+          append(jvmId.toString()).
+          append(" #Tasks ran: "). 
+          append(jvmIdToRunner.get(jvmId).numTasksRan).
+          append(" Currently busy? ").
+          append(jvmIdToRunner.get(jvmId).busy).
+          append(" Currently running: "). 
+          append(jvmToRunningTask.get(jvmId).getTask().getTaskID().toString());
+      }
+      return details.toString();
     }
 
-    private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker) {
+    private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker, 
+        TaskRunner t) {
       JvmRunner jvmRunner = new JvmRunner(env,jobId);
       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
       //spawn the JVM in a new thread. Note that there will be very little
@@ -247,6 +285,7 @@
             TaskAttemptID.forName(env.conf.get("mapred.task.id")),
             tracker.getMemoryForTask(env.conf));
       }
+      setRunningTaskForJvm(jvmRunner.jvmId, t);
       LOG.info(jvmRunner.getName());
       jvmRunner.start();
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Tue Sep 30 16:15:00 2008
@@ -207,7 +207,7 @@
 
     // TaskUmbilicalProtocol methods
 
-    public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) { return null; }
+    public JvmTask getTask(JVMId jvmId) { return null; }
 
     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Sep 30 16:15:00 2008
@@ -111,8 +111,9 @@
   }
   
   @Override
-  public TaskRunner createRunner(TaskTracker tracker) {
-    return new MapTaskRunner(this, tracker, this.conf);
+  public TaskRunner createRunner(TaskTracker tracker, 
+      TaskTracker.TaskInProgress tip) {
+    return new MapTaskRunner(tip, tracker, this.conf);
   }
 
   @Override

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java Tue Sep 30 16:15:00 2008
@@ -19,10 +19,12 @@
 
 import java.io.*;
 
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
 
-  public MapTaskRunner(Task task, TaskTracker tracker, JobConf conf) {
+  public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
     super(task, tracker, conf);
   }
   

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Sep 30 16:15:00 2008
@@ -70,6 +70,7 @@
 import org.apache.hadoop.mapred.IFile.*;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -161,8 +162,9 @@
   }
 
   @Override
-  public TaskRunner createRunner(TaskTracker tracker) throws IOException {
-    return new ReduceTaskRunner(this, tracker, this.conf);
+  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip) 
+  throws IOException {
+    return new ReduceTaskRunner(tip, tracker, this.conf);
   }
 
   @Override

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Sep 30 16:15:00 2008
@@ -19,10 +19,12 @@
 
 import java.io.*;
 
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
   
-  public ReduceTaskRunner(Task task, TaskTracker tracker, 
+  public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
     
     super(task, tracker, conf);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Sep 30 16:15:00 2008
@@ -303,9 +303,10 @@
     throws IOException;
 
 
-  /** Return an approprate thread runner for this task. */
-  public abstract TaskRunner createRunner(TaskTracker tracker
-                                          ) throws IOException;
+  /** Return an approprate thread runner for this task. 
+   * @param tip TODO*/
+  public abstract TaskRunner createRunner(TaskTracker tracker, 
+      TaskTracker.TaskInProgress tip) throws IOException;
 
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Tue Sep 30 16:15:00 2008
@@ -486,7 +486,6 @@
       // @see {@link TaskTracker.transmitHeartbeat()}
       if ((newState != TaskStatus.State.RUNNING && 
            newState != TaskStatus.State.COMMIT_PENDING && 
-           newState != TaskStatus.State.INITIALIZED &&
            newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
@@ -499,8 +498,7 @@
       // to running. This is a spot fix, but it should be addressed more
       // globally.
       if ((newState == TaskStatus.State.RUNNING || 
-          newState == TaskStatus.State.UNASSIGNED ||
-          newState == TaskStatus.State.INITIALIZED) &&
+          newState == TaskStatus.State.UNASSIGNED) &&
           (oldState == TaskStatus.State.FAILED || 
            oldState == TaskStatus.State.KILLED || 
            oldState == TaskStatus.State.SUCCEEDED ||
@@ -713,7 +711,6 @@
     TaskStatus st = taskStatuses.get(taskId);
     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
         || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
-        st.getRunState() == TaskStatus.State.INITIALIZED ||
         st.getRunState() == TaskStatus.State.UNASSIGNED)
         && tasksToKill.put(taskId, shouldFail) == null ) {
       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Tue Sep 30 16:15:00 2008
@@ -42,6 +42,7 @@
     LogFactory.getLog(TaskRunner.class);
 
   volatile boolean killed = false;
+  private TaskTracker.TaskInProgress tip;
   private Task t;
   private Object lock = new Object();
   private volatile boolean done = false;
@@ -58,8 +59,10 @@
    */
   protected MapOutputFile mapOutputFile;
 
-  public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {
-    this.t = t;
+  public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
+      JobConf conf) {
+    this.tip = tip;
+    this.t = tip.getTask();
     this.tracker = tracker;
     this.conf = conf;
     this.mapOutputFile = new MapOutputFile(t.getJobID());
@@ -68,6 +71,7 @@
   }
 
   public Task getTask() { return t; }
+  public TaskTracker.TaskInProgress getTaskInProgress() { return tip; }
   public TaskTracker getTracker() { return tracker; }
 
   /** Called to assemble this task's input.  This method is run in the parent
@@ -403,9 +407,7 @@
         ldLibraryPath.append(oldLdLibraryPath);
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-      tracker.taskInitialized(t.getTaskID());
-      LOG.info("Task ID: " + t.getTaskID() +" initialized");
-      jvmManager.launchJvm(t.getJobID(), t.isMapTask(), 
+      jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
               workDir, env, pidFile, conf));
       synchronized (lock) {
@@ -537,6 +539,7 @@
   public void kill() {
     killed = true;
     jvmManager.taskKilled(this);
+    signalDone();
   }
   public void signalDone() {
     synchronized (lock) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Tue Sep 30 16:15:00 2008
@@ -41,7 +41,7 @@
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING, INITIALIZED}
+                            COMMIT_PENDING}
     
   private TaskAttemptID taskid;
   private float progress;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep 30 16:15:00 2008
@@ -72,6 +72,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.JobClient.TaskStatusFilter;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -1186,7 +1187,6 @@
       for (TaskStatus taskStatus : status.getTaskReports()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
-            taskStatus.getRunState() != TaskStatus.State.INITIALIZED &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
@@ -1250,7 +1250,6 @@
       // still occupied and hence memory of the task should be
       // accounted in used memory.
       if ((tip.getRunState() == TaskStatus.State.RUNNING)
-            || (tip.getRunState() == TaskStatus.State.INITIALIZED)
             || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
         maxMemoryUsed += getMemoryForTask(tip.getJobConf());
       }
@@ -1304,7 +1303,6 @@
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
       if (tip.getRunState() == TaskStatus.State.RUNNING ||
-          tip.getRunState() == TaskStatus.State.INITIALIZED ||
           tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
@@ -1611,7 +1609,15 @@
             numFreeSlots.set(numFreeSlots.get() - 1);
             assert (numFreeSlots.get() >= 0);
           }
-          
+          synchronized (tip) {
+            //to make sure that there is no kill task action for this
+            if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+              //got killed externally while still in the launcher queue
+              addFreeSlot();
+              continue;
+            }
+            tip.slotTaken = true;
+          }
           //got a free slot. launch the task
           startNewTask(tip);
         } catch (InterruptedException e) { 
@@ -1740,6 +1746,7 @@
     private TaskStatus taskStatus; 
     private long taskTimeout;
     private String debugCommand;
+    private volatile boolean slotTaken = false;
         
     /**
      */
@@ -1833,7 +1840,7 @@
         alwaysKeepTaskFiles = false;
       }
       if (debugCommand != null || localJobConf.getProfileEnabled() ||
-          alwaysKeepTaskFiles) {
+          alwaysKeepTaskFiles || keepFailedTaskFiles) {
         //disable jvm reuse
         localJobConf.setNumTasksToExecutePerJvm(1);
       }
@@ -1885,9 +1892,16 @@
      * Kick off the task execution
      */
     public synchronized void launchTask() throws IOException {
-      localizeTask(task);
-      this.runner = task.createRunner(TaskTracker.this);
-      this.runner.start();
+      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+        localizeTask(task);
+        this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        this.runner = task.createRunner(TaskTracker.this, this);
+        this.runner.start();
+        this.taskStatus.setStartTime(System.currentTimeMillis());
+      } else {
+        LOG.info("Not launching task: " + task.getTaskID() + 
+            " since it's state is " + this.taskStatus.getRunState());
+      }
     }
 
     /**
@@ -2095,13 +2109,6 @@
 
     }
     
-    synchronized void taskInitialized() {
-      if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
-        //one-way state change to INITIALIZED
-        this.taskStatus.setRunState(TaskStatus.State.INITIALIZED);
-      }
-    }
-  
 
     /**
      * Runs the script given in args
@@ -2193,7 +2200,6 @@
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
             getRunState() == TaskStatus.State.UNASSIGNED ||
-            getRunState() == TaskStatus.State.INITIALIZED ||
             getRunState() == TaskStatus.State.COMMIT_PENDING) {
           kill(wasFailure);
         }
@@ -2209,12 +2215,12 @@
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
-          taskStatus.getRunState() == TaskStatus.State.INITIALIZED ||
           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
+        runner.kill();
         taskStatus.setRunState((wasFailure) ? 
                                   TaskStatus.State.FAILED : 
                                   TaskStatus.State.KILLED);
@@ -2225,15 +2231,12 @@
         } else {
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
-      }
-      if (runner != null) {
-        runner.kill();
-        runner.signalDone();
-      } else {
-        if (task.isMapTask()) {
-          addFreeMapSlot();
-        } else {
-          addFreeReduceSlot();
+        if (slotTaken) {
+          if (task.isMapTask()) {
+            addFreeMapSlot();
+          } else {
+            addFreeReduceSlot();
+          }
         }
       }
     }
@@ -2344,7 +2347,7 @@
   /**
    * Called upon startup by the child process, to fetch Task data.
    */
-  public synchronized JvmTask getTask(JVMId jvmId, TaskAttemptID firstTaskId) 
+  public synchronized JvmTask getTask(JVMId jvmId) 
   throws IOException {
     LOG.debug("JVM with ID : " + jvmId + " asked for a task");
     if (!jvmManager.isJvmKnown(jvmId)) {
@@ -2353,41 +2356,24 @@
     }
     RunningJob rjob = runningJobs.get(jvmId.getJobId());
     if (rjob == null) { //kill the JVM since the job is dead
+      LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
+               " is dead");
       jvmManager.killJvm(jvmId);
       return new JvmTask(null, true);
     }
-    TaskInProgress t = runningTasks.get(firstTaskId);
-    //if we can give the JVM the task it is asking for, well and good;
-    //if not, we give it some other task from the same job (note that some
-    //other JVM might have run this task while this JVM was init'ing)
-    if (t == null || t.getStatus().getRunState() != 
-                     TaskStatus.State.INITIALIZED) {
-      boolean isMap = jvmId.isMapJVM();
-      synchronized (rjob) {
-        for (TaskInProgress tip : runningTasks.values()) {
-          synchronized (tip) {
-            if (tip.getTask().getJobID().equals(jvmId.getJobId()) &&
-                tip.getRunState() == TaskStatus.State.INITIALIZED
-                && ((isMap && tip.getTask().isMapTask()) ||
-                    (!isMap && !tip.getTask().isMapTask()))) {
-              t = tip;
-            }
-          }
-        }
-      }
-    }
-    //now the task could be null or we could have got a task that already
-    //ran earlier (the firstTaskId case)
-    if (t == null || t.getRunState() != TaskStatus.State.INITIALIZED) {
-      jvmManager.setRunningTaskForJvm(jvmId, null);  
+    TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
+    if (tip == null) {
       return new JvmTask(null, false);
     }
-    t.getStatus().setRunState(TaskStatus.State.RUNNING);
-    t.getStatus().setStartTime(System.currentTimeMillis());
-    jvmManager.setRunningTaskForJvm(jvmId,t.getTaskRunner());
-    LOG.info("JVM with ID: " + jvmId + " given task: " + 
-        t.getTask().getTaskID().toString());
-    return new JvmTask(t.getTask(), false);
+    if (tasks.get(tip.getTask().getTaskID()) != null) { //is task still present
+      LOG.info("JVM with ID: " + jvmId + " given task: " + 
+          tip.getTask().getTaskID());
+      return new JvmTask(tip.getTask(), false);
+    } else {
+      LOG.info("Killing JVM with ID: " + jvmId + " since scheduled task: " + 
+          tip.getTask().getTaskID() + " is " + tip.taskStatus.getRunState());
+      return new JvmTask(null, true);
+    }
   }
 
   /**
@@ -2543,12 +2529,6 @@
     }
   }
   
-  synchronized void taskInitialized(TaskAttemptID taskid) {
-    TaskInProgress tip = tasks.get(taskid);
-    if (tip != null) {
-      tip.taskInitialized();
-    }
-  }
 
   /**
    * A completed map task's output has been lost.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Sep 30 16:15:00 2008
@@ -207,8 +207,7 @@
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED) ||
-           (state == TaskStatus.State.INITIALIZED))) {
+           (state == TaskStatus.State.UNASSIGNED))) {
         mapCount++;
       }
     }
@@ -225,8 +224,7 @@
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED) ||
-           (state == TaskStatus.State.INITIALIZED))) {
+           (state == TaskStatus.State.UNASSIGNED))) {
         reduceCount++;
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=700628&r1=700627&r2=700628&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Sep 30 16:15:00 2008
@@ -51,18 +51,18 @@
    *            stale or not. Hence the return type is a class that 
    *            encapsulates the events and whether to reset events index.
    * Version 13 changed the getTask method signature for HADOOP-249
+   * Version 14 changed the getTask method signature for HADOOP-4232
    * */
 
-  public static final long versionID = 13L;
+  public static final long versionID = 14L;
   
   /**
    * Called when a child task process starts, to get its task.
    * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
-   * @param taskid the first taskid that the JVM runs
    * @return Task object
    * @throws IOException 
    */
-  JvmTask getTask(JVMId jvmId, TaskAttemptID taskid) throws IOException;
+  JvmTask getTask(JVMId jvmId) throws IOException;
 
   /**
    * Report child's progress to parent.