You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2010/02/16 11:52:29 UTC

svn commit: r910465 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskTracker.java src/test/mapred/org/apache/hadoop/mapred/TestTaskLauncher.java

Author: yhemanth
Date: Tue Feb 16 10:52:28 2010
New Revision: 910465

URL: http://svn.apache.org/viewvc?rev=910465&view=rev
Log:
MAPREDUCE-1398. Fix TaskLauncher to stop waiting for slots on a TIP that is killed / failed. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLauncher.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=910465&r1=910464&r2=910465&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Feb 16 10:52:28 2010
@@ -351,6 +351,10 @@
     tasks like job setup/cleanup and task cleanup.
     (Amareshwari Sriramadasu via yhemanth)
 
+    MAPREDUCE-1398. Fix TaskLauncher to stop waiting for slots on a TIP that
+    is killed / failed.
+    (Amareshwari Sriramadasu via yhemanth)
+ 
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=910465&r1=910464&r2=910465&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Feb 16 10:52:28 2010
@@ -25,7 +25,6 @@
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -82,7 +81,6 @@
 import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -377,14 +375,7 @@
               if (action instanceof KillJobAction) {
                 purgeJob((KillJobAction) action);
               } else if (action instanceof KillTaskAction) {
-                TaskInProgress tip;
-                KillTaskAction killAction = (KillTaskAction) action;
-                synchronized (TaskTracker.this) {
-                  tip = tasks.get(killAction.getTaskID());
-                }
-                LOG.info("Received KillTaskAction for task: " + 
-                         killAction.getTaskID());
-                purgeTask(tip, false);
+                processKillTaskAction((KillTaskAction) action);
               } else {
                 LOG.error("Non-delete action given to cleanup thread: "
                           + action);
@@ -396,6 +387,16 @@
         }
       }, "taskCleanup");
 
+  void processKillTaskAction(KillTaskAction killAction) throws IOException {
+    TaskInProgress tip;
+    synchronized (TaskTracker.this) {
+      tip = tasks.get(killAction.getTaskID());
+    }
+    LOG.info("Received KillTaskAction for task: " + 
+             killAction.getTaskID());
+    purgeTask(tip, false);
+  }
+  
   public TaskController getTaskController() {
     return taskController;
   }
@@ -695,7 +696,7 @@
     LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculatorPlugin);
     initializeMemoryManagement();
 
-    this.indexCache = new IndexCache(this.fConf);
+    setIndexCache(new IndexCache(this.fConf));
 
     mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
     reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
@@ -1262,6 +1263,10 @@
     return directoryCleanupThread;
   }
 
+  void setIndexCache(IndexCache cache) {
+    this.indexCache = cache;
+  }
+  
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
@@ -2008,7 +2013,7 @@
     }
   }
   
-  private class TaskLauncher extends Thread {
+  class TaskLauncher extends Thread {
     private IntWritable numFreeSlots;
     private final int maxSlots;
     private List<TaskInProgress> tasksToLaunch;
@@ -2042,6 +2047,18 @@
       }
     }
     
+    void notifySlots() {
+      synchronized (numFreeSlots) {
+        numFreeSlots.notifyAll();
+      }      
+    }
+    
+    int getNumWaitingTasksToLaunch() {
+      synchronized (tasksToLaunch) {
+        return tasksToLaunch.size();
+      }
+    }
+    
     public void run() {
       while (!Thread.interrupted()) {
         try {
@@ -2059,12 +2076,33 @@
           }
           //wait for free slots to run
           synchronized (numFreeSlots) {
+            boolean canLaunch = true;
             while (numFreeSlots.get() < task.getNumSlotsRequired()) {
+              //Make sure that there is no kill task action for this task!
+              //We are not locking tip here, because it would reverse the
+              //locking order!
+              //Also, Lock for the tip is not required here! because :
+              // 1. runState of TaskStatus is volatile
+              // 2. Any notification is not missed because notification is
+              // synchronized on numFreeSlots. So, while we are doing the check,
+              // if the tip is half way through the kill(), we don't miss
+              // notification for the following wait(). 
+              if (!tip.canBeLaunched()) {
+                //got killed externally while still in the launcher queue
+                LOG.info("Not blocking slots for " + task.getTaskID()
+                    + " as it got killed externally. Task's state is "
+                    + tip.getRunState());
+                canLaunch = false;
+                break;
+              }              
               LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() + 
                        " to launch " + task.getTaskID() + ", currently we have " + 
                        numFreeSlots.get() + " free slots");
               numFreeSlots.wait();
             }
+            if (!canLaunch) {
+              continue;
+            }
             LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
                      " and trying to launch "+tip.getTask().getTaskID() + 
                      " which needs " + task.getNumSlotsRequired() + " slots");
@@ -2073,10 +2111,10 @@
           }
           synchronized (tip) {
             //to make sure that there is no kill task action for this
-            if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
-                tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
-                tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
+            if (!tip.canBeLaunched()) {
               //got killed externally while still in the launcher queue
+              LOG.info("Not launching task " + task.getTaskID() + " as it got"
+                + " killed externally. Task's state is " + tip.getRunState());
               addFreeSlots(task.getNumSlotsRequired());
               continue;
             }
@@ -2116,7 +2154,7 @@
    * All exceptions are handled locally, so that we don't mess up the
    * task tracker.
    */
-  private void startNewTask(TaskInProgress tip) {
+  void startNewTask(TaskInProgress tip) {
     try {
       localizeJob(tip);
     } catch (Throwable e) {
@@ -2397,6 +2435,13 @@
    	  return this.taskStatus.inTaskCleanupPhase();
     }
     
+    // checks if state has been changed for the task to be launched
+    boolean canBeLaunched() {
+      return (getRunState() == TaskStatus.State.UNASSIGNED ||
+          getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+          getRunState() == TaskStatus.State.KILLED_UNCLEAN);
+    }
+
     /**
      * The task is reporting its progress
      */
@@ -2804,6 +2849,11 @@
           launcher.addFreeSlots(task.getNumSlotsRequired());
         }
         slotTaken = false;
+      } else {
+        // wake up the launcher. it may be waiting to block slots for this task.
+        if (launcher != null) {
+          launcher.notifySlots();
+        }        
       }
     }
 
@@ -3726,7 +3776,7 @@
     }
   }
 
-  private void setTaskMemoryManagerEnabledFlag() {
+  void setTaskMemoryManagerEnabledFlag() {
     if (!ProcfsBasedProcessTree.isAvailable()) {
       LOG.info("ProcessTree implementation is missing on this system. "
           + "TaskMemoryManager is disabled.");

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLauncher.java?rev=910465&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLauncher.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLauncher.java Tue Feb 16 10:52:28 2010
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.TaskTracker.TaskLauncher;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.junit.Test;
+
+/**
+ * Tests {@link TaskLauncher}
+ * 
+ */
+public class TestTaskLauncher {
+  private static int expectedLaunchAttemptId = 1;
+
+  private static class MyTaskTracker extends TaskTracker {
+    // override startNewTask just to set the runState, 
+    // not to launch the task really 
+    @Override
+    void startNewTask(TaskInProgress tip) {
+      assertEquals(expectedLaunchAttemptId, tip.getTask().getTaskID().getId());
+      tip.getStatus().setRunState(TaskStatus.State.RUNNING);
+    }
+  }
+
+  /**
+   * Tests the case "task waiting to be launched is killed externally".
+   * 
+   * Launches a task which will wait for ever to get slots. Kill the
+   * task and see if launcher is able to come out of the wait and pickup a
+   * another task.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testExternalKillForLaunchTask() throws IOException {
+    // setup a TaskTracker
+    JobConf ttConf = new JobConf();
+    ttConf.setInt(TTConfig.TT_MAP_SLOTS, 4);
+    TaskTracker tt = new MyTaskTracker();
+    tt.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    tt.setConf(ttConf);
+    tt.setIndexCache(new IndexCache(ttConf));
+    tt.setTaskMemoryManagerEnabledFlag(); 
+    
+    // start map-task launcher with four slots
+    TaskLauncher mapLauncher = tt.new TaskLauncher(TaskType.MAP, 4);
+    mapLauncher.start();
+    
+    // launch a task which requires five slots
+    String jtId = "test";
+    TaskAttemptID attemptID = new TaskAttemptID(jtId, 1, TaskType.MAP, 0, 0);
+    Task task = new MapTask(null, attemptID, 0, null, 5);
+    mapLauncher.addToTaskQueue(new LaunchTaskAction(task));
+    // verify that task is added to runningTasks
+    TaskInProgress killTip = tt.runningTasks.get(attemptID);
+    assertNotNull(killTip);
+
+    // wait for a while for launcher to pick up the task
+    // this loop waits atmost for 30 seconds
+    for (int i = 0; i < 300; i++) {
+      if (mapLauncher.getNumWaitingTasksToLaunch() == 0) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+    }
+    assertEquals("Launcher didnt pick up the task " + attemptID + "to launch",
+        0, mapLauncher.getNumWaitingTasksToLaunch());
+
+    // Now, that launcher has picked up the task, it waits until all five slots
+    // are available. i.e. it waits for-ever
+    // lets kill the task so that map launcher comes out
+    tt.processKillTaskAction(new KillTaskAction(attemptID));
+    assertEquals(TaskStatus.State.KILLED, killTip.getRunState());
+
+    // launch another attempt which requires only one slot
+    TaskAttemptID runningAttemptID = new TaskAttemptID(jtId, 1, TaskType.MAP,
+        0, expectedLaunchAttemptId);
+    mapLauncher.addToTaskQueue(new LaunchTaskAction(new MapTask(null,
+        runningAttemptID, 0, null, 1)));
+    TaskInProgress runningTip = tt.runningTasks.get(runningAttemptID);
+    assertNotNull(runningTip);
+
+    // wait for a while for the task to be launched
+    // this loop waits at most for 30 seconds
+    for (int i = 0; i < 300; i++) {
+      if (runningTip.getRunState().equals(TaskStatus.State.RUNNING)) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+    }
+
+    // verify that the task went to running
+    assertEquals(TaskStatus.State.RUNNING, runningTip.getRunState());
+  }
+
+}