You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/30 12:29:41 UTC
svn commit: r1195145 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...
Author: vinodkv
Date: Sun Oct 30 11:29:41 2011
New Revision: 1195145
URL: http://svn.apache.org/viewvc?rev=1195145&view=rev
Log:
MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a task-scheduling deadlock. Contributed by Robert Joseph Evans.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Oct 30 11:29:41 2011
@@ -1862,6 +1862,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3256. Added authorization checks for the protocol between
NodeManager and ApplicationMaster. (vinodkv via acmurthy)
+ MAPREDUCE-3274. Fixed a race condition in MRAppMaster that was causing a
+ task-scheduling deadlock. (Robert Joseph Evans via vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Sun Oct 30 11:29:41 2011
@@ -23,8 +23,10 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,12 +71,14 @@ public class TaskAttemptListenerImpl ext
private AppContext context;
private Server server;
- private TaskHeartbeatHandler taskHeartbeatHandler;
+ protected TaskHeartbeatHandler taskHeartbeatHandler;
private InetSocketAddress address;
- private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToAttemptMap =
+ private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap =
Collections.synchronizedMap(new HashMap<WrappedJvmID,
org.apache.hadoop.mapred.Task>());
private JobTokenSecretManager jobTokenSecretManager = null;
+ private Set<WrappedJvmID> pendingJvms =
+ Collections.synchronizedSet(new HashSet<WrappedJvmID>());
public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager) {
@@ -395,35 +399,55 @@ public class TaskAttemptListenerImpl ext
JVMId jvmId = context.jvmId;
LOG.info("JVM with ID : " + jvmId + " asked for a task");
-
- // TODO: Is it an authorised container to get a task? Otherwise return null.
-
- // TODO: Is the request for task-launch still valid?
+
+ JvmTask jvmTask = null;
+ // TODO: Is it an authorized container to get a task? Otherwise return null.
// TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
// to jobId and task-type.
WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
jvmId.getId());
- org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID);
- if (task != null) { //there may be lag in the attempt getting added here
- LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
- JvmTask jvmTask = new JvmTask(task, false);
-
- //remove the task as it is no more needed and free up the memory
- jvmIDToAttemptMap.remove(wJvmID);
-
- return jvmTask;
+ synchronized(this) {
+ if(pendingJvms.contains(wJvmID)) {
+ org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID);
+ if (task != null) { //there may be lag in the attempt getting added here
+ LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+ jvmTask = new JvmTask(task, false);
+
+ //remove the task as it is no more needed and free up the memory
+ //Also we have already told the JVM to process a task, so it is no
+ //longer pending, and further request should ask it to exit.
+ pendingJvms.remove(wJvmID);
+ jvmIDToActiveAttemptMap.remove(wJvmID);
+ }
+ } else {
+ LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
+ jvmTask = new JvmTask(null, true);
+ }
}
- return null;
+ return jvmTask;
+ }
+
+ @Override
+ public synchronized void registerPendingTask(WrappedJvmID jvmID) {
+ //Save this JVM away as one that has not been handled yet
+ pendingJvms.add(jvmID);
}
@Override
- public void register(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+ public void registerLaunchedTask(
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
- //create the mapping so that it is easy to look up
- //when it comes back to ask for Task.
- jvmIDToAttemptMap.put(jvmID, task);
+ synchronized(this) {
+ //create the mapping so that it is easy to look up
+ //when it comes back to ask for Task.
+ jvmIDToActiveAttemptMap.put(jvmID, task);
+ //This should not need to happen here, but just to be on the safe side
+ if(!pendingJvms.add(jvmID)) {
+ LOG.warn(jvmID+" launched without first being registered");
+ }
+ }
//register this attempt
taskHeartbeatHandler.register(attemptID);
}
@@ -432,8 +456,9 @@ public class TaskAttemptListenerImpl ext
public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
WrappedJvmID jvmID) {
//remove the mapping if not already removed
- jvmIDToAttemptMap.remove(jvmID);
-
+ jvmIDToActiveAttemptMap.remove(jvmID);
+ //remove the pending if not already removed
+ pendingJvms.remove(jvmID);
//unregister this attempt
taskHeartbeatHandler.unregister(attemptID);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Sun Oct 30 11:29:41 2011
@@ -24,12 +24,35 @@ import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+/**
+ * This class listens for changes to the state of a Task.
+ */
public interface TaskAttemptListener {
InetSocketAddress getAddress();
- void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+ /**
+ * register a JVM with the listener. This should be called as soon as a
+ * JVM ID is assigned to a task attempt, before it has been launched.
+ * @param jvmID The ID of the JVM .
+ */
+ void registerPendingTask(WrappedJvmID jvmID);
+
+ /**
+ * Register the task and task attempt with the JVM. This should be called
+ * when the JVM has been launched.
+ * @param attemptID the id of the attempt for this JVM.
+ * @param task the task itself for this JVM.
+ * @param jvmID the id of the JVM handling the task.
+ */
+ void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+ /**
+ * Unregister the JVM and the attempt associated with it. This should be
+ * called when the attempt/JVM has finished executing and is being cleaned up.
+ * @param attemptID the ID of the attempt.
+ * @param jvmID the ID of the JVM for that attempt.
+ */
void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sun Oct 30 11:29:41 2011
@@ -1012,6 +1012,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.jvmID = new WrappedJvmID(
taskAttempt.remoteTask.getTaskID().getJobID(),
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
+ taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
//launch the container
//create the container object to be launched for a given Task attempt
@@ -1106,7 +1107,7 @@ public abstract class TaskAttemptImpl im
// register it to TaskAttemptListener so that it start listening
// for it
- taskAttempt.taskAttemptListener.register(
+ taskAttempt.taskAttemptListener.registerLaunchedTask(
taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr =
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1195145&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Sun Oct 30 11:29:41 2011
@@ -0,0 +1,100 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
+import org.junit.Test;
+
+public class TestTaskAttemptListenerImpl {
+ public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
+
+ public MockTaskAttemptListenerImpl(AppContext context,
+ JobTokenSecretManager jobTokenSecretManager,
+ TaskHeartbeatHandler hbHandler) {
+ super(context, jobTokenSecretManager);
+ this.taskHeartbeatHandler = hbHandler;
+ }
+
+ @Override
+ protected void registerHeartbeatHandler() {
+ //Empty
+ }
+
+ @Override
+ protected void startRpcServer() {
+ //Empty
+ }
+
+ @Override
+ protected void stopRpcServer() {
+ //Empty
+ }
+ }
+
+ @Test
+ public void testGetTask() throws IOException {
+ AppContext appCtx = mock(AppContext.class);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ MockTaskAttemptListenerImpl listener =
+ new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+ Configuration conf = new Configuration();
+ listener.init(conf);
+ listener.start();
+ JVMId id = new JVMId("foo",1, true, 1);
+ WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
+
+ //The JVM ID has not been registered yet so we should kill it.
+ JvmContext context = new JvmContext();
+ context.jvmId = id;
+ JvmTask result = listener.getTask(context);
+ assertNotNull(result);
+ assertTrue(result.shouldDie);
+
+ //Now register the JVM, and see
+ listener.registerPendingTask(wid);
+ result = listener.getTask(context);
+ assertNull(result);
+
+ TaskAttemptId attemptID = mock(TaskAttemptId.class);
+ Task task = mock(Task.class);
+ //Now put a task with the ID
+ listener.registerLaunchedTask(attemptID, task, wid);
+ verify(hbHandler).register(attemptID);
+ result = listener.getTask(context);
+ assertNotNull(result);
+ assertFalse(result.shouldDie);
+
+ //Verify that if we call it again a second time we are told to die.
+ result = listener.getTask(context);
+ assertNotNull(result);
+ assertTrue(result.shouldDie);
+
+ listener.unregister(attemptID, wid);
+ listener.stop();
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1195145&r1=1195144&r2=1195145&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sun Oct 30 11:29:41 2011
@@ -294,11 +294,14 @@ public class MRApp extends MRAppMaster {
return null;
}
@Override
- public void register(TaskAttemptId attemptID,
+ public void registerLaunchedTask(TaskAttemptId attemptID,
org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
@Override
public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
}
+ @Override
+ public void registerPendingTask(WrappedJvmID jvmID) {
+ }
};
}