You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/07 21:38:18 UTC

svn commit: r483651 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Thu Dec  7 12:38:17 2006
New Revision: 483651

URL: http://svn.apache.org/viewvc?view=rev&rev=483651
Log:
HADOOP-639.  Restructure InterTrackerProtocol to make task accounting more reliable.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Dec  7 12:38:17 2006
@@ -30,6 +30,9 @@
  8. HADOOP-676. Improved exceptions and error messages for common job
     input specification errors.  (Sanjay Dahiya via cutting)
 
+ 9. HADOOP-639. Restructure InterTrackerProtocol to make task
+    accounting more reliable.  (Arun C Murthy via cutting)
+
 
 Release 0.9.1 - 2006-12-06
 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java Thu Dec  7 12:38:17 2006
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * The response sent by the {@link JobTracker} to the hearbeat sent
+ * periodically by the {@link TaskTracker}
+ * 
+ * @author Arun C Murthy
+ */
+class HeartbeatResponse implements Writable, Configurable {
+  Configuration conf = null;
+  short responseId;
+  TaskTrackerAction[] actions;
+
+  HeartbeatResponse() {}
+  
+  HeartbeatResponse(short responseId, TaskTrackerAction[] actions) {
+    this.responseId = responseId;
+    this.actions = actions;
+  }
+  
+  public void setResponseId(short responseId) {
+    this.responseId = responseId; 
+  }
+  
+  public short getResponseId() {
+    return responseId;
+  }
+  
+  public void setActions(TaskTrackerAction[] actions) {
+    this.actions = actions;
+  }
+  
+  public TaskTrackerAction[] getActions() {
+    return actions;
+  }
+  
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeShort(responseId);
+    if (actions == null) {
+      WritableUtils.writeVInt(out, 0);
+    } else {
+      WritableUtils.writeVInt(out, actions.length);
+      for (TaskTrackerAction action : actions) {
+        WritableUtils.writeEnum(out, action.getActionId());
+        action.write(out);
+      }
+    }
+    //ObjectWritable.writeObject(out, actions, actions.getClass(), conf);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    this.responseId = in.readShort();
+    int length = WritableUtils.readVInt(in);
+    if (length > 0) {
+      actions = new TaskTrackerAction[length];
+      for (int i=0; i < length; ++i) {
+        TaskTrackerAction.ActionType actionType = 
+          WritableUtils.readEnum(in, TaskTrackerAction.ActionType.class);
+        actions[i] = TaskTrackerAction.createAction(actionType);
+        actions[i].readFields(in);
+      }
+    } else {
+      actions = null;
+    }
+    //actions = (TaskTrackerAction[]) ObjectWritable.readObject(in, conf);
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Thu Dec  7 12:38:17 2006
@@ -27,31 +27,40 @@
  * The JobTracker is the Server, which implements this protocol.
  */ 
 interface InterTrackerProtocol extends VersionedProtocol {
-  // version 2 introduced to replace TaskStatus.State with an enum
-  public static final long versionID = 2L;
+  /**
+   * version 3 introduced to replace 
+   * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
+   * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
+   */
+  public static final long versionID = 3L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
 
-  /** 
-   * Called regularly by the task tracker to update the status of its tasks
-   * within the job tracker.  JobTracker responds with a code that tells the 
-   * TaskTracker whether all is well.
-   *
-   * TaskTracker must also indicate whether this is the first interaction
-   * (since state refresh)
+  /**
+   * Called regularly by the {@link TaskTracker} to update the status of its 
+   * tasks within the job tracker. {@link JobTracker} responds with a 
+   * {@link HeartbeatResponse} that directs the 
+   * {@link TaskTracker} to undertake a series of 'actions' 
+   * (see {@link org.apache.hadoop.mapred.TaskTrackerAction.ActionType}).  
+   * 
+   * {@link TaskTracker} must also indicate whether this is the first 
+   * interaction (since state refresh) and acknowledge the last response
+   * it recieved from the {@link JobTracker} 
+   * 
+   * @param status the status update
+   * @param initialContact <code>true</code> if this is first interaction since
+   *                       'refresh', <code>false</code> otherwise.
+   * @param acceptNewTasks <code>true</code> if the {@link TaskTracker} is
+   *                       ready to accept new tasks to run.                 
+   * @param responseId the last responseId successfully acted upon by the
+   *                   {@link TaskTracker}.
+   * @return a {@link org.apache.hadoop.mapred.HeartbeatResponse} with 
+   *         fresh instructions.
    */
-  int emitHeartbeat(TaskTrackerStatus status, 
-                    boolean initialContact) throws IOException;
-
-  /** Called to get new tasks from from the job tracker for this tracker.*/
-  Task pollForNewTask(String trackerName) throws IOException;
-
-  /** Called to find which tasks that have been run by this tracker should now
-   * be closed because their job is complete.  This is used to, e.g., 
-   * notify a map task that its output is no longer needed and may 
-   * be removed. */
-  String[] pollForTaskWithClosedJob(String trackerName) throws IOException;
+  HeartbeatResponse heartbeat(TaskTrackerStatus status, 
+          boolean initialContact, boolean acceptNewTasks, short responseId)
+  throws IOException;
 
   /** Called by a reduce task to find which map tasks are completed.
    *

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Dec  7 12:38:17 2006
@@ -427,6 +427,9 @@
     // (trackerID->TreeSet of taskids running at that tracker)
     TreeMap trackerToTaskMap = new TreeMap();
 
+    // (trackerID --> last sent HeartBeatResponseID)
+    Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap();
+    
     //
     // Watch and expire TaskTracker objects using these structures.
     // We can map from Name->TaskTrackerStatus, or we can expire by time.
@@ -723,6 +726,74 @@
     ////////////////////////////////////////////////////
 
     /**
+     * The periodic heartbeat mechanism between the {@link TaskTracker} and
+     * the {@link JobTracker}.
+     * 
+     * The {@link JobTracker} processes the status information sent by the 
+     * {@link TaskTracker} and responds with instructions to start/stop 
+     * tasks or jobs, and also 'reset' instructions during contingencies. 
+     */
+    public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
+            boolean initialContact, boolean acceptNewTasks, short responseId) 
+    throws IOException {
+      LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
+              " (initialContact: " + initialContact + 
+              " acceptNewTasks: " + acceptNewTasks + ")" +
+              " with responseId: " + responseId);
+      
+        // First check if the last heartbeat response got through 
+        String trackerName = status.getTrackerName();
+        Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName);
+      
+        short newResponseId = (short)(responseId + 1);
+        if (!initialContact && oldResponseId != null && 
+                oldResponseId.shortValue() != responseId) {
+            newResponseId = oldResponseId.shortValue();
+        }
+      
+        // Process this heartbeat 
+        if (!processHeartbeat(status, initialContact, 
+                (newResponseId != responseId))) {
+            if (oldResponseId != null) {
+                trackerToHeartbeatResponseIDMap.remove(trackerName);
+            }
+
+            return new HeartbeatResponse(newResponseId, 
+                  new TaskTrackerAction[] {new ReinitTrackerAction()});
+        }
+      
+        // Initialize the response to be sent for the heartbeat
+        HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
+        List<TaskTrackerAction> actions = new ArrayList();
+      
+        // Check for new tasks to be executed on the tasktracker
+        if (acceptNewTasks) {
+        Task task = getNewTaskForTaskTracker(trackerName);
+            if (task != null) {
+                LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskId());
+                actions.add(new LaunchTaskAction(task));
+            }
+        }
+      
+        // Check for tasks to be killed
+        List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
+        if (killTasksList != null) {
+            actions.addAll(killTasksList);
+        }
+     
+        response.setActions(
+                actions.toArray(new TaskTrackerAction[actions.size()]));
+        
+        // Update the trackerToHeartbeatResponseIDMap
+        if (newResponseId != responseId) {
+            trackerToHeartbeatResponseIDMap.put(trackerName, 
+                    new Short(newResponseId));
+        }
+
+        return response;
+    }
+    
+    /**
      * Update the last recorded status for the given task tracker.
      * It assumes that the taskTrackers are locked on entry.
      * @author Owen O'Malley
@@ -752,16 +823,21 @@
     /**
      * Process incoming heartbeat messages from the task trackers.
      */
-    public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) {
+    private synchronized boolean processHeartbeat(
+            TaskTrackerStatus trackerStatus, 
+            boolean initialContact, boolean updateStatusTimestamp) {
         String trackerName = trackerStatus.getTrackerName();
-        trackerStatus.setLastSeen(System.currentTimeMillis());
+        if (initialContact || updateStatusTimestamp) {
+          trackerStatus.setLastSeen(System.currentTimeMillis());
+        }
 
         synchronized (taskTrackers) {
             synchronized (trackerExpiryQueue) {
                 boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                              trackerStatus);
                 if (initialContact) {
-                    // If it's first contact, then clear out any state hanging around
+                    // If it's first contact, then clear out 
+                    // any state hanging around
                     if (seenBefore) {
                         lostTaskTracker(trackerName, trackerStatus.getHost());
                     }
@@ -770,7 +846,7 @@
                     if (!seenBefore) {
                         LOG.warn("Status from unknown Tracker : " + trackerName);
                         taskTrackers.remove(trackerName); 
-                        return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+                        return false;
                     }
                 }
 
@@ -782,18 +858,17 @@
 
         updateTaskStatuses(trackerStatus);
         //LOG.info("Got heartbeat from "+trackerName);
-        return InterTrackerProtocol.TRACKERS_OK;
+        return true;
     }
 
     /**
-     * A tracker wants to know if there's a Task to run.  Returns
-     * a task we'd like the TaskTracker to execute right now.
+     * Returns a task we'd like the TaskTracker to execute right now.
      *
      * Eventually this function should compute load on the various TaskTrackers,
      * and incorporate knowledge of DFS file placement.  But for right now, it
      * just grabs a single item out of the pending task list and hands it back.
      */
-    public synchronized Task pollForNewTask(String taskTracker) {
+    private synchronized Task getNewTaskForTaskTracker(String taskTracker) {
         //
         // Compute average map and reduce task numbers across pool
         //
@@ -936,23 +1011,36 @@
      * A tracker wants to know if any of its Tasks have been
      * closed (because the job completed, whether successfully or not)
      */
-    public synchronized String[] pollForTaskWithClosedJob(String taskTracker) {
-        TreeSet taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);
+    private synchronized List getTasksToKill(String taskTracker) {
+        Set<String> taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);
         if (taskIds != null) {
-            ArrayList list = new ArrayList();
-            for (Iterator it = taskIds.iterator(); it.hasNext(); ) {
-                String taskId = (String) it.next();
-                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
-                if (tip.shouldCloseForClosedJob(taskId)) {
+            List<TaskTrackerAction> killList = new ArrayList();
+            Set<String> killJobIds = new TreeSet(); 
+            for (String killTaskId : taskIds ) {
+                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId);
+                if (tip.shouldCloseForClosedJob(killTaskId)) {
                     // 
                     // This is how the JobTracker ends a task at the TaskTracker.
                     // It may be successfully completed, or may be killed in
                     // mid-execution.
                     //
-                   list.add(taskId);
+                    if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) {
+                        killList.add(new KillTaskAction(killTaskId));
+                        LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
+                    } else {
+                      //killTasksList.add(new KillJobAction(taskId));
+                        String killJobId = tip.getJob().getStatus().getJobId(); 
+                        killJobIds.add(killJobId);
+                    }
                 }
             }
-            return (String[]) list.toArray(new String[list.size()]);
+            
+            for (String killJobId : killJobIds) {
+                killList.add(new KillJobAction(killJobId));
+                LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
+            }
+
+            return killList;
         }
         return null;
     }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java Thu Dec  7 12:38:17 2006
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker} 
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to kill the task of 
+ * a job and cleanup resources.
+ * 
+ * @author Arun C Murthy
+ */
+class KillJobAction extends TaskTrackerAction {
+  String jobId;
+
+  public KillJobAction() {
+    super(ActionType.KILL_JOB);
+  }
+
+  public KillJobAction(String taskId) {
+    super(ActionType.KILL_JOB);
+    this.jobId = taskId;
+  }
+  
+  public String getJobId() {
+    return jobId;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, jobId);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    jobId = Text.readString(in);
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java Thu Dec  7 12:38:17 2006
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker} 
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to kill a task.
+ * 
+ * @author Arun C Murthy
+ */
+class KillTaskAction extends TaskTrackerAction {
+  String taskId;
+  
+  public KillTaskAction() {
+    super(ActionType.KILL_TASK);
+  }
+  
+  public KillTaskAction(String taskId) {
+    super(ActionType.KILL_TASK);
+    this.taskId = taskId;
+  }
+
+  public String getTaskId() {
+    return taskId;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, taskId);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskId = Text.readString(in);
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java Thu Dec  7 12:38:17 2006
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker} 
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to launch a new task.
+ * 
+ * @author Arun C Murthy
+ */
+class LaunchTaskAction extends TaskTrackerAction {
+  private Task task;
+
+  public LaunchTaskAction() {
+    super(ActionType.LAUNCH_TASK);
+  }
+  
+  public LaunchTaskAction(Task task) {
+    super(ActionType.LAUNCH_TASK);
+    this.task = task;
+  }
+  
+  public Task getTask() {
+    return task;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(task.isMapTask());
+    task.write(out);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    boolean isMapTask = in.readBoolean();
+    if (isMapTask) {
+      task = new MapTask();
+    } else {
+      task = new ReduceTask();
+    }
+    task.readFields(in);
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java Thu Dec  7 12:38:17 2006
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker} 
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to reinitialize itself.
+ * 
+ * @author Arun C Murthy
+ */
+class ReinitTrackerAction extends TaskTrackerAction {
+
+  public ReinitTrackerAction() {
+    super(ActionType.REINIT_TRACKER);
+  }
+  
+  public void write(DataOutput out) throws IOException {}
+
+  public void readFields(DataInput in) throws IOException {}
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Dec  7 12:38:17 2006
@@ -228,7 +228,12 @@
             (job.getStatus().getRunState() != JobStatus.RUNNING)) {
             tasksReportedClosed.add(taskid);
             return true;
-        } else {
+        } else if( !isMapTask() && isComplete() && 
+                ! tasksReportedClosed.contains(taskid) ){
+            tasksReportedClosed.add(taskid);
+            return true; 
+        }
+        else {
             return false;
         }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Dec  7 12:38:17 2006
@@ -68,6 +68,9 @@
 
     Server taskReportServer = null;
     InterTrackerProtocol jobClient;
+    
+    // last heartbeat response recieved
+    short heartbeatResponseId = -1;
 
     StatusHttpServer server = null;
     
@@ -187,7 +190,7 @@
         }
       }
     }
-    
+
     static String getCacheSubdir() {
       return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
     }
@@ -458,15 +461,23 @@
               }
             }
 
-            if (!transmitHeartBeat()) {
+            // Send the heartbeat and process the jobtracker's directives
+            HeartbeatResponse heartbeatResponse = transmitHeartBeat();
+            TaskTrackerAction[] actions = heartbeatResponse.getActions();
+            LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 
+                    heartbeatResponse.getResponseId() + " and " + 
+                    ((actions != null) ? actions.length : 0) + " actions");
+            
+            if (reinitTaskTracker(actions)) {
               return State.STALE;
             }
+            
             lastHeartbeat = now;
             justStarted = false;
 
-            checkForNewTasks();
+            checkAndStartNewTasks(actions);
             markUnresponsiveTasks();
-            closeCompletedTasks();
+            closeCompletedTasks(actions);
             killOverflowingTasks();
             
             //we've cleaned up, resume normal operation
@@ -498,56 +509,94 @@
      * @return false if the tracker was unknown
      * @throws IOException
      */
-    private boolean transmitHeartBeat() throws IOException {
+    private HeartbeatResponse transmitHeartBeat() throws IOException {
       //
       // Build the heartbeat information for the JobTracker
       //
-      List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
+      List<TaskStatus> taskReports = 
+        new ArrayList<TaskStatus>(runningTasks.size());
       synchronized (this) {
-          for (TaskInProgress tip: runningTasks.values()) {
-              taskReports.add(tip.createStatus());
-          }
+        for (TaskInProgress tip: runningTasks.values()) {
+          taskReports.add(tip.createStatus());
+        }
       }
       TaskTrackerStatus status = 
         new TaskTrackerStatus(taskTrackerName, localHostname, 
-                              httpPort, taskReports, 
-                              failures); 
-
+                httpPort, taskReports, 
+                failures); 
+      
+      //
+      // Check if we should ask for a new Task
+      //
+      boolean askForNewTask = false; 
+      if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
+              acceptNewTasks) {
+        checkLocalDirs(fConf.getLocalDirs());
+        
+        if (enoughFreeSpace(minSpaceStart)) {
+          askForNewTask = true;
+        }
+      }
+      
       //
       // Xmit the heartbeat
       //
+      HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
+              justStarted, askForNewTask, 
+              heartbeatResponseId);
+      heartbeatResponseId = heartbeatResponse.getResponseId();
       
-      int resultCode = jobClient.emitHeartbeat(status, justStarted);
       synchronized (this) {
-        for (TaskStatus taskStatus: taskReports) {
-            if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
-                if (taskStatus.getIsMap()) {
-                    mapTotal--;
-                } else {
-                    reduceTotal--;
-                }
-                myMetrics.completeTask();
-                runningTasks.remove(taskStatus.getTaskId());
+        for (TaskStatus taskStatus : taskReports) {
+          if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+            if (taskStatus.getIsMap()) {
+              mapTotal--;
+            } else {
+              reduceTotal--;
             }
+            myMetrics.completeTask();
+            runningTasks.remove(taskStatus.getTaskId());
+          }
         }
       }
-      return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+      return heartbeatResponse;
     }
 
     /**
+     * Check if the jobtracker directed a 'reset' of the tasktracker.
+     * 
+     * @param actions the directives of the jobtracker for the tasktracker.
+     * @return <code>true</code> if tasktracker is to be reset, 
+     *         <code>false</code> otherwise.
+     */
+    private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
+      if (actions != null) {
+        for (TaskTrackerAction action : actions) {
+          if (action.getActionId() == 
+            TaskTrackerAction.ActionType.REINIT_TRACKER) {
+            LOG.info("Recieved RenitTrackerAction from JobTracker");
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+    
+    /**
      * Check to see if there are any new tasks that we should run.
      * @throws IOException
      */
-    private void checkForNewTasks() throws IOException {
-      //
-      // Check if we should ask for a new Task
-      //
-      if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
-          acceptNewTasks) {
-        checkLocalDirs(fConf.getLocalDirs());
-        
-        if (enoughFreeSpace(minSpaceStart)) {
-          Task t = jobClient.pollForNewTask(taskTrackerName);
+    private void checkAndStartNewTasks(TaskTrackerAction[] actions) 
+    throws IOException {
+      if (actions == null) {
+        return;
+      }
+      
+      for (TaskTrackerAction action : actions) {
+        if (action.getActionId() == 
+          TaskTrackerAction.ActionType.LAUNCH_TASK) {
+          Task t = ((LaunchTaskAction)(action)).getTask();
+          LOG.info("LaunchTaskAction: " + t.getTaskId());
           if (t != null) {
             startNewTask(t);
           }
@@ -580,24 +629,73 @@
      * Ask the JobTracker if there are any tasks that we should clean up,
      * either because we don't need them any more or because the job is done.
      */
-    private void closeCompletedTasks() throws IOException {
-      String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
-      if (toCloseIds != null) {
-        synchronized (this) {
-          for (int i = 0; i < toCloseIds.length; i++) {
-            TaskInProgress tip = tasks.get(toCloseIds[i]);
-            if (tip != null) {
-              // remove the task from running jobs, removing the job if 
-              // it is the last task
-              removeTaskFromJob(tip.getTask().getJobId(), tip);
-              tasksToCleanup.put(tip);
+    private void closeCompletedTasks(TaskTrackerAction[] actions) 
+    throws IOException {
+      if (actions == null) {
+        return;
+      }
+      
+      for (TaskTrackerAction action : actions) {
+        TaskTrackerAction.ActionType actionType = action.getActionId();
+        
+        if (actionType == TaskTrackerAction.ActionType.KILL_JOB) {
+          String jobId = ((KillJobAction)action).getJobId();
+          LOG.info("Received 'KillJobAction' for job: " + jobId);
+          synchronized (runningJobs) {
+            RunningJob rjob = runningJobs.get(jobId);
+            if (rjob == null) {
+              LOG.warn("Unknown job " + jobId + " being deleted.");
             } else {
-              LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
+              synchronized (rjob) {
+                int noJobTasks = rjob.tasks.size(); 
+                int taskCtr = 0;
+                
+                // Add this tips of this job to queue of tasks to be purged 
+                for (TaskInProgress tip : rjob.tasks) {
+                  // Purge the job files for the last element in rjob.tasks
+                  if (++taskCtr == noJobTasks) {
+                    tip.setPurgeJobFiles(true);
+                  }
+
+                  tasksToCleanup.put(tip);
+                }
+                
+                // Remove this job 
+                rjob.tasks.clear();
+                runningJobs.remove(jobId);
+              }
             }
           }
+        } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) {
+          String taskId = ((KillTaskAction)action).getTaskId();
+          LOG.info("Received KillTaskAction for task: " + taskId);
+          purgeTask(tasks.get(taskId), false);
         }
       }
     }
+    
+    /**
+     * Remove the tip and update all relevant state.
+     * 
+     * @param tip {@link TaskInProgress} to be removed.
+     * @param purgeJobFiles <code>true</code> if the job files are to be
+     *                      purged, <code>false</code> otherwise.
+     */
+    private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) {
+      if (tip != null) {
+        LOG.info("About to purge task: " + tip.getTask().getTaskId());
+        
+        // Cleanup the job files? 
+        tip.setPurgeJobFiles(purgeJobFiles);
+        
+        // Remove the task from running jobs, 
+        // removing the job if it's the last task
+        removeTaskFromJob(tip.getTask().getJobId(), tip);
+        
+        // Add this tip to queue of tasks to be purged 
+        tasksToCleanup.put(tip);
+      }
+    }
 
     /** Check if we're dangerously low on disk space
      * If so, kill jobs to free up space and make sure
@@ -829,6 +927,9 @@
         private boolean alwaysKeepTaskFiles;
         private TaskStatus taskStatus ; 
         private boolean keepJobFiles;
+        
+        /** Cleanup the job files when the job is complete (done/failed) */
+        private boolean purgeJobFiles = false;
 
         /**
          */
@@ -893,6 +994,10 @@
             keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
         }
         
+        public void setPurgeJobFiles(boolean purgeJobFiles) {
+          this.purgeJobFiles = purgeJobFiles;
+        }
+        
         /**
          */
         public synchronized TaskStatus createStatus() {
@@ -1046,10 +1151,12 @@
               LOG.warn("Error in deleting reduce temporary output",e); 
             }
             
-            // delete the job diretory for this task 
-            // since the job is done/failed
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
-                    JOBCACHE + Path.SEPARATOR +  task.getJobId());
+            // Delete the job directory for this  
+            // task if the job is done/failed
+            if (purgeJobFiles) {
+              this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                      JOBCACHE + Path.SEPARATOR +  task.getJobId());
+            }
         }
 
         /**

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Dec  7 12:38:17 2006
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hadoop.mapred.JobTracker}
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to take some 'action'. 
+ * 
+ * @author Arun C Murthy
+ */
+abstract class TaskTrackerAction implements Writable {
+  
+  /**
+   * Ennumeration of various 'actions' that the {@link JobTracker}
+   * directs the {@link TaskTracker} to perform periodically.
+   * 
+   * @author Arun C Murthy
+   */
+  public static enum ActionType {
+    /** Launch a new task. */
+    LAUNCH_TASK,
+    
+    /** Kill a task. */
+    KILL_TASK,
+    
+    /** Kill any tasks of this job and cleanup. */
+    KILL_JOB,
+    
+    /** Reinitialize the tasktracker. */
+    REINIT_TRACKER
+  };
+  
+  /**
+   * A factory-method to create objects of given {@link ActionType}. 
+   * @param actionType the {@link ActionType} of object to create.
+   * @return an object of {@link ActionType}.
+   */
+  public static TaskTrackerAction createAction(ActionType actionType) {
+    TaskTrackerAction action = null;
+    
+    switch (actionType) {
+      case LAUNCH_TASK:
+        {
+          action = new LaunchTaskAction();
+        }
+        break;
+        case KILL_TASK:
+        {
+          action = new KillTaskAction();
+        }
+        break;
+      case KILL_JOB:
+        {
+          action = new KillJobAction();
+        }
+        break;
+      case REINIT_TRACKER:
+        {
+          action = new ReinitTrackerAction();
+        }
+        break;
+    }
+
+    return action;
+  }
+  
+  private ActionType actionType;
+  
+  protected TaskTrackerAction(ActionType actionType) {
+    this.actionType = actionType;
+  }
+  
+  /**
+   * Return the {@link ActionType}.
+   * @return the {@link ActionType}.
+   */
+  ActionType getActionId() {
+    return actionType;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeEnum(out, actionType);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    actionType = WritableUtils.readEnum(in, ActionType.class);
+  }
+}