You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/11/11 05:48:01 UTC

svn commit: r712940 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JvmManager.java src/mapred/org/apache/hadoop/mapred/TaskRunner.java src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Author: ddas
Date: Mon Nov 10 20:48:01 2008
New Revision: 712940

URL: http://svn.apache.org/viewvc?rev=712940&view=rev
Log:
HADOOP-4595. Fixes two race conditions - one to do with updating free slot count, and another to do with starting the MapEventsFetcher thread. Contributed by Devaraj Das.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=712940&r1=712939&r2=712940&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Nov 10 20:48:01 2008
@@ -1090,6 +1090,9 @@
     HADOOP-4282. Some user facing URLs are not filtered by user filters.
     (szetszwo)
 
+    HADOOP-4595. Fixes two race conditions - one to do with updating free slot count,
+    and another to do with starting the MapEventsFetcher thread. (ddas)
+
 Release 0.18.3 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=712940&r1=712939&r2=712940&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Mon Nov 10 20:48:01 2008
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -177,7 +178,14 @@
     }
     
     synchronized public void stop() {
-      for (JvmRunner jvm : jvmIdToRunner.values()) {
+      //since the kill() method invoked later on would remove
+      //an entry from the jvmIdToRunner map, we create a
+      //copy of the values and iterate over it (if we don't
+      //make a copy, we will encounter concurrentModification
+      //exception
+      List <JvmRunner> list = new ArrayList<JvmRunner>();
+      list.addAll(jvmIdToRunner.values());
+      for (JvmRunner jvm : list) {
         jvm.kill();
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=712940&r1=712939&r2=712940&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Mon Nov 10 20:48:01 2008
@@ -459,11 +459,6 @@
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
       tracker.reportTaskFinished(t.getTaskID(), false);
-      if (t.isMapTask()) {
-        tracker.addFreeMapSlot();
-      } else {
-        tracker.addFreeReduceSlot();
-      }
     }
   }
   

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=712940&r1=712939&r2=712940&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Mon Nov 10 20:48:01 2008
@@ -461,7 +461,7 @@
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
                        jobTrackAddr, this.fConf);
-        
+    this.running = true;    
     // start the thread that will fetch map task completion events
     this.mapEventsFetcher = new MapEventsFetcherThread();
     mapEventsFetcher.setDaemon(true);
@@ -484,7 +484,6 @@
     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
     mapLauncher.start();
     reduceLauncher.start();
-    this.running = true;
   }
   
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -781,20 +780,8 @@
 
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
     synchronized (tip) {
-      try {
-        tip.setJobConf(jobConf);
-        tip.launchTask();
-      } catch (Throwable ie) {
-        tip.taskStatus.setRunState(TaskStatus.State.FAILED);
-        try {
-          tip.cleanup(true);
-        } catch (Throwable ie2) {
-          // Ignore it, we are just trying to cleanup.
-        }
-        String error = StringUtils.stringifyException(ie);
-        tip.reportDiagnosticInfo(error);
-        LOG.info(error);
-      }
+      tip.setJobConf(jobConf);
+      tip.launchTask();
     }
   }
     
@@ -851,8 +838,6 @@
     this.mapEventsFetcher.interrupt();
     
     //stop the launchers
-    mapLauncher.cleanTaskQueue();
-    reduceLauncher.cleanTaskQueue();
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
     
@@ -1539,14 +1524,6 @@
   public JvmManager getJvmManagerInstance() {
     return jvmManager;
   }
-
-  public void addFreeMapSlot() {
-    mapLauncher.addFreeSlot();
-  }
-  
-  public void addFreeReduceSlot() {
-    reduceLauncher.addFreeSlot();
-  }
   
   private void addToTaskQueue(LaunchTaskAction action) {
     if (action.getTask().isMapTask()) {
@@ -1571,7 +1548,7 @@
 
     public void addToTaskQueue(LaunchTaskAction action) {
       synchronized (tasksToLaunch) {
-        TaskInProgress tip = registerTask(action);
+        TaskInProgress tip = registerTask(action, this);
         tasksToLaunch.add(tip);
         tasksToLaunch.notifyAll();
       }
@@ -1632,10 +1609,11 @@
       }
     }
   }
-  private TaskInProgress registerTask(LaunchTaskAction action) {
+  private TaskInProgress registerTask(LaunchTaskAction action, 
+      TaskLauncher launcher) {
     Task t = action.getTask();
     LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
-    TaskInProgress tip = new TaskInProgress(t, this.fConf);
+    TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
       runningTasks.put(t.getTaskID(), tip);
@@ -1667,6 +1645,7 @@
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);
+        tip.cleanup(true);
       } catch (IOException ie2) {
         LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
                  StringUtils.stringifyException(ie2));          
@@ -1750,11 +1729,17 @@
     private long taskTimeout;
     private String debugCommand;
     private volatile boolean slotTaken = false;
+    private TaskLauncher launcher;
         
     /**
      */
     public TaskInProgress(Task task, JobConf conf) {
+      this(task, conf, null);
+    }
+    
+    public TaskInProgress(Task task, JobConf conf, TaskLauncher launcher) {
       this.task = task;
+      this.launcher = launcher;
       this.lastProgressReport = System.currentTimeMillis();
       this.defaultJobConf = conf;
       localJobConf = null;
@@ -2234,13 +2219,16 @@
         } else {
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
-        if (slotTaken) {
-          if (task.isMapTask()) {
-            addFreeMapSlot();
-          } else {
-            addFreeReduceSlot();
-          }
+      }
+      releaseSlot();
+    }
+    
+    private synchronized void releaseSlot() {
+      if (slotTaken) {
+        if (launcher != null) {
+          launcher.addFreeSlot();
         }
+        slotTaken = false;
       }
     }
 
@@ -2518,6 +2506,11 @@
     if (tip != null) {
       if (!commitPending) {
         tip.taskFinished();
+        // Remove the entry from taskMemoryManagerThread's data structures.
+        if (isTaskMemoryManagerEnabled()) {
+          taskMemoryManager.removeTask(taskid);
+        }
+        tip.releaseSlot();
       }
       synchronized(finishedCount) {
         finishedCount[0]++;
@@ -2526,10 +2519,6 @@
     } else {
       LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
     }
-    // Remove the entry from taskMemoryManagerThread's data structures.
-    if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager.removeTask(taskid);
-    }
   }