You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/30 12:29:41 UTC

svn commit: r1195145 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...

Author: vinodkv
Date: Sun Oct 30 11:29:41 2011
New Revision: 1195145

URL: http://svn.apache.org/viewvc?rev=1195145&view=rev
Log:
MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a task-scheduling deadlock. Contributed by Robert Joseph Evans.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Oct 30 11:29:41 2011
@@ -1862,6 +1862,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3256. Added authorization checks for the protocol between
     NodeManager and ApplicationMaster. (vinodkv via acmurthy) 
 
+    MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a
+    task-scheduling deadlock. (Robert Joseph Evans via vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Sun Oct 30 11:29:41 2011
@@ -23,8 +23,10 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,12 +71,14 @@ public class TaskAttemptListenerImpl ext
 
   private AppContext context;
   private Server server;
-  private TaskHeartbeatHandler taskHeartbeatHandler;
+  protected TaskHeartbeatHandler taskHeartbeatHandler;
   private InetSocketAddress address;
-  private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToAttemptMap = 
+  private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap = 
     Collections.synchronizedMap(new HashMap<WrappedJvmID, 
         org.apache.hadoop.mapred.Task>());
   private JobTokenSecretManager jobTokenSecretManager = null;
+  private Set<WrappedJvmID> pendingJvms =
+    Collections.synchronizedSet(new HashSet<WrappedJvmID>());
   
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager) {
@@ -395,35 +399,55 @@ public class TaskAttemptListenerImpl ext
 
     JVMId jvmId = context.jvmId;
     LOG.info("JVM with ID : " + jvmId + " asked for a task");
-
-    // TODO: Is it an authorised container to get a task? Otherwise return null.
-
-    // TODO: Is the request for task-launch still valid?
+    
+    JvmTask jvmTask = null;
+    // TODO: Is it an authorized container to get a task? Otherwise return null.
 
     // TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
     // to jobId and task-type.
 
     WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
         jvmId.getId());
-    org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID);
-    if (task != null) { //there may be lag in the attempt getting added here
-      LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
-      JvmTask jvmTask = new JvmTask(task, false);
-      
-      //remove the task as it is no more needed and free up the memory
-      jvmIDToAttemptMap.remove(wJvmID);
-      
-      return jvmTask;
+    synchronized(this) {
+      if(pendingJvms.contains(wJvmID)) {
+        org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID);
+        if (task != null) { //there may be lag in the attempt getting added here
+         LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+          jvmTask = new JvmTask(task, false);
+
+          //remove the task as it is no more needed and free up the memory
+          //Also we have already told the JVM to process a task, so it is no
+          //longer pending, and further request should ask it to exit.
+          pendingJvms.remove(wJvmID);
+          jvmIDToActiveAttemptMap.remove(wJvmID);
+        }
+      } else {
+        LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
+        jvmTask = new JvmTask(null, true);
+      }
     }
-    return null;
+    return jvmTask;
+  }
+  
+  @Override
+  public synchronized void registerPendingTask(WrappedJvmID jvmID) {
+    //Save this JVM away as one that has not been handled yet
+    pendingJvms.add(jvmID);
   }
 
   @Override
-  public void register(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+  public void registerLaunchedTask(
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
       org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
-    //create the mapping so that it is easy to look up
-    //when it comes back to ask for Task.
-    jvmIDToAttemptMap.put(jvmID, task);
+    synchronized(this) {
+      //create the mapping so that it is easy to look up
+      //when it comes back to ask for Task.
+      jvmIDToActiveAttemptMap.put(jvmID, task);
+      //This should not need to happen here, but just to be on the safe side
+      if(!pendingJvms.add(jvmID)) {
+        LOG.warn(jvmID+" launched without first being registered");
+      }
+    }
     //register this attempt
     taskHeartbeatHandler.register(attemptID);
   }
@@ -432,8 +456,9 @@ public class TaskAttemptListenerImpl ext
   public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
       WrappedJvmID jvmID) {
     //remove the mapping if not already removed
-    jvmIDToAttemptMap.remove(jvmID);
-
+    jvmIDToActiveAttemptMap.remove(jvmID);
+    //remove the pending if not already removed
+    pendingJvms.remove(jvmID);
     //unregister this attempt
     taskHeartbeatHandler.unregister(attemptID);
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Sun Oct 30 11:29:41 2011
@@ -24,12 +24,35 @@ import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 
+/**
+ * This class listens for changes to the state of a Task.
+ */
 public interface TaskAttemptListener {
 
   InetSocketAddress getAddress();
 
-  void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+  /**
+   * register a JVM with the listener.  This should be called as soon as a 
+   * JVM ID is assigned to a task attempt, before it has been launched.
+   * @param jvmID The ID of the JVM .
+   */
+  void registerPendingTask(WrappedJvmID jvmID);
+  
+  /**
+   * Register the task and task attempt with the JVM.  This should be called
+   * when the JVM has been launched.
+   * @param attemptID the id of the attempt for this JVM.
+   * @param task the task itself for this JVM.
+   * @param jvmID the id of the JVM handling the task.
+   */
+  void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
 
+  /**
+   * Unregister the JVM and the attempt associated with it.  This should be 
+   * called when the attempt/JVM has finished executing and is being cleaned up.
+   * @param attemptID the ID of the attempt.
+   * @param jvmID the ID of the JVM for that attempt.
+   */
   void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
 
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sun Oct 30 11:29:41 2011
@@ -1012,6 +1012,7 @@ public abstract class TaskAttemptImpl im
       taskAttempt.jvmID = new WrappedJvmID(
           taskAttempt.remoteTask.getTaskID().getJobID(), 
           taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
+      taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
       
       //launch the container
       //create the container object to be launched for a given Task attempt
@@ -1106,7 +1107,7 @@ public abstract class TaskAttemptImpl im
 
       // register it to TaskAttemptListener so that it start listening
       // for it
-      taskAttempt.taskAttemptListener.register(
+      taskAttempt.taskAttemptListener.registerLaunchedTask(
           taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1195145&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Sun Oct 30 11:29:41 2011
@@ -0,0 +1,100 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
+import org.junit.Test;
+
+public class TestTaskAttemptListenerImpl {
+  public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
+
+    public MockTaskAttemptListenerImpl(AppContext context,
+        JobTokenSecretManager jobTokenSecretManager,
+        TaskHeartbeatHandler hbHandler) {
+      super(context, jobTokenSecretManager);
+      this.taskHeartbeatHandler = hbHandler;
+    }
+    
+    @Override
+    protected void registerHeartbeatHandler() {
+      //Empty
+    }
+
+    @Override
+    protected void startRpcServer() {
+      //Empty
+    }
+    
+    @Override
+    protected void stopRpcServer() {
+      //Empty
+    }
+  }
+  
+  @Test
+  public void testGetTask() throws IOException {
+    AppContext appCtx = mock(AppContext.class);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
+    TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    MockTaskAttemptListenerImpl listener = 
+      new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+    JVMId id = new JVMId("foo",1, true, 1);
+    WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+
+    //The JVM ID has not been registered yet so we should kill it.
+    JvmContext context = new JvmContext();
+    context.jvmId = id; 
+    JvmTask result = listener.getTask(context);
+    assertNotNull(result);
+    assertTrue(result.shouldDie);
+    
+    //Now register the JVM, and see
+    listener.registerPendingTask(wid);
+    result = listener.getTask(context);
+    assertNull(result);
+    
+    TaskAttemptId attemptID = mock(TaskAttemptId.class);
+    Task task = mock(Task.class);
+    //Now put a task with the ID
+    listener.registerLaunchedTask(attemptID, task, wid);
+    verify(hbHandler).register(attemptID);
+    result = listener.getTask(context);
+    assertNotNull(result);
+    assertFalse(result.shouldDie);
+    
+    //Verify that if we call it again a second time we are told to die.
+    result = listener.getTask(context);
+    assertNotNull(result);
+    assertTrue(result.shouldDie);
+    
+    listener.unregister(attemptID, wid);
+    listener.stop();
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sun Oct 30 11:29:41 2011
@@ -294,11 +294,14 @@ public class MRApp extends MRAppMaster {
         return null;
       }
       @Override
-      public void register(TaskAttemptId attemptID, 
+      public void registerLaunchedTask(TaskAttemptId attemptID, 
           org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
       @Override
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       }
+      @Override
+      public void registerPendingTask(WrappedJvmID jvmID) {
+      }
     };
   }