You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/12/01 09:35:21 UTC
svn commit: r1208994 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac...
Author: mahadev
Date: Thu Dec 1 08:35:20 2011
New Revision: 1208994
URL: http://svn.apache.org/viewvc?rev=1208994&view=rev
Log:
MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Dec 1 08:35:20 2011
@@ -204,6 +204,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3488. Streaming jobs are failing because the main class
isnt set in the pom files. (mahadev)
+
+ MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with
+ java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev)
Release 0.23.0 - 2011-11-01
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Dec 1 08:35:20 2011
@@ -217,8 +217,7 @@ public class MRAppMaster extends Composi
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
- recoveryServ = new RecoveryService(appAttemptID, clock,
- committer);
+ recoveryServ = createRecoveryService(context);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
@@ -425,6 +424,15 @@ public class MRAppMaster extends Composi
return new JobFinishEventHandler();
}
+ /**
+ * Create the recovery service.
+ * @return an instance of the recovery service.
+ */
+ protected Recovery createRecoveryService(AppContext appContext) {
+ return new RecoveryService(appContext.getApplicationAttemptId(),
+ appContext.getClock(), getCommitter());
+ }
+
/** Create and initialize (but don't start) a single job. */
protected Job createJob(Configuration conf) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Dec 1 08:35:20 2011
@@ -76,8 +76,6 @@ import org.apache.hadoop.yarn.event.Asyn
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -97,8 +95,6 @@ import org.apache.hadoop.yarn.util.Conve
public class RecoveryService extends CompositeService implements Recovery {
- private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationAttemptId applicationAttemptId;
@@ -120,7 +116,7 @@ public class RecoveryService extends Com
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
- this.dispatcher = new RecoveryDispatcher();
+ this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
}
@@ -209,17 +205,32 @@ public class RecoveryService extends Com
LOG.info("Read completed tasks from history "
+ completedTasks.size());
}
+
+ protected Dispatcher createRecoveryDispatcher() {
+ return new RecoveryDispatcher();
+ }
+
+ protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
+ return new RecoveryDispatcher(exitOnException);
+ }
+ @SuppressWarnings("rawtypes")
class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler;
private final EventHandler handler;
- RecoveryDispatcher() {
+ RecoveryDispatcher(boolean exitOnException) {
+ super(exitOnException);
actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler);
}
+ RecoveryDispatcher() {
+ this(false);
+ }
+
@Override
+ @SuppressWarnings("unchecked")
public void dispatch(Event event) {
if (recoveryMode) {
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
@@ -267,6 +278,10 @@ public class RecoveryService extends Com
}
}
}
+ realDispatch(event);
+ }
+
+ public void realDispatch(Event event) {
super.dispatch(event);
}
@@ -281,6 +296,7 @@ public class RecoveryService extends Com
return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
private class InterceptingEventHandler implements EventHandler {
EventHandler actualHandler;
@@ -407,7 +423,9 @@ public class RecoveryService extends Com
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId();
- NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname());
+ NodeId nodeId =
+ ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+ + attemptInfo.getPort());
// Resource/Priority/ApplicationACLs are only needed while launching the
// container on an NM, these are already completed tasks, so setting them
// to null
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Thu Dec 1 08:35:20 2011
@@ -52,7 +52,12 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
+import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@@ -408,6 +413,13 @@ public class TestRecovery {
}
@Override
+ protected Recovery createRecoveryService(AppContext appContext) {
+ return new RecoveryServiceWithCustomDispatcher(
+ appContext.getApplicationAttemptId(), appContext.getClock(),
+ getCommitter());
+ }
+
+ @Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher();
launcher.shufflePort = 5467;
@@ -422,7 +434,22 @@ public class TestRecovery {
return eventHandler;
}
}
-
+
+ class RecoveryServiceWithCustomDispatcher extends RecoveryService {
+
+ public RecoveryServiceWithCustomDispatcher(
+ ApplicationAttemptId applicationAttemptId, Clock clock,
+ OutputCommitter committer) {
+ super(applicationAttemptId, clock, committer);
+ }
+
+ @Override
+ public Dispatcher createRecoveryDispatcher() {
+ return super.createRecoveryDispatcher(false);
+ }
+
+ }
+
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java Thu Dec 1 08:35:20 2011
@@ -45,18 +45,25 @@ public class AsyncDispatcher extends Abs
private Thread eventHandlingThread;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
+ private boolean exitOnDispatchException;
public AsyncDispatcher() {
this(new HashMap<Class<? extends Enum>, EventHandler>(),
- new LinkedBlockingQueue<Event>());
+ new LinkedBlockingQueue<Event>(), true);
+ }
+
+ public AsyncDispatcher(boolean exitOnException) {
+ this(new HashMap<Class<? extends Enum>, EventHandler>(),
+ new LinkedBlockingQueue<Event>(), exitOnException);
}
AsyncDispatcher(
Map<Class<? extends Enum>, EventHandler> eventDispatchers,
- BlockingQueue<Event> eventQueue) {
+ BlockingQueue<Event> eventQueue, boolean exitOnException) {
super("Dispatcher");
this.eventQueue = eventQueue;
this.eventDispatchers = eventDispatchers;
+ this.exitOnDispatchException = exitOnException;
}
Runnable createThread() {
@@ -118,7 +125,9 @@ public class AsyncDispatcher extends Abs
catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread. Exiting..", t);
- System.exit(-1);
+ if (exitOnDispatchException) {
+ System.exit(-1);
+ }
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java Thu Dec 1 08:35:20 2011
@@ -36,7 +36,7 @@ public class DrainDispatcher extends Asy
}
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
- super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue);
+ super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue, true);
this.queue = eventQueue;
}