You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/12/01 09:35:21 UTC

svn commit: r1208994 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac...

Author: mahadev
Date: Thu Dec  1 08:35:20 2011
New Revision: 1208994

URL: http://svn.apache.org/viewvc?rev=1208994&view=rev
Log:
MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Dec  1 08:35:20 2011
@@ -204,6 +204,9 @@ Release 0.23.1 - Unreleased
 
     MAPREDUCE-3488. Streaming jobs are failing because the main class
     isnt set in the pom files. (mahadev)
+ 
+    MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with
+    java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev)
 
 Release 0.23.0 - 2011-11-01 
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Dec  1 08:35:20 2011
@@ -217,8 +217,7 @@ public class MRAppMaster extends Composi
         && appAttemptID.getAttemptId() > 1) {
       LOG.info("Recovery is enabled. "
           + "Will try to recover from previous life on best effort basis.");
-      recoveryServ = new RecoveryService(appAttemptID, clock, 
-          committer);
+      recoveryServ = createRecoveryService(context);
       addIfService(recoveryServ);
       dispatcher = recoveryServ.getDispatcher();
       clock = recoveryServ.getClock();
@@ -425,6 +424,15 @@ public class MRAppMaster extends Composi
     return new JobFinishEventHandler();
   }
 
+  /**
+   * Create the recovery service.
+   * @return an instance of the recovery service.
+   */
+  protected Recovery createRecoveryService(AppContext appContext) {
+    return new RecoveryService(appContext.getApplicationAttemptId(),
+        appContext.getClock(), getCommitter());
+  }
+
   /** Create and initialize (but don't start) a single job. */
   protected Job createJob(Configuration conf) {
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Dec  1 08:35:20 2011
@@ -76,8 +76,6 @@ import org.apache.hadoop.yarn.event.Asyn
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -97,8 +95,6 @@ import org.apache.hadoop.yarn.util.Conve
 
 public class RecoveryService extends CompositeService implements Recovery {
 
-  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
 
   private final ApplicationAttemptId applicationAttemptId;
@@ -120,7 +116,7 @@ public class RecoveryService extends Com
     super("RecoveringDispatcher");
     this.applicationAttemptId = applicationAttemptId;
     this.committer = committer;
-    this.dispatcher = new RecoveryDispatcher();
+    this.dispatcher = createRecoveryDispatcher();
     this.clock = new ControlledClock(clock);
       addService((Service) dispatcher);
   }
@@ -209,17 +205,32 @@ public class RecoveryService extends Com
     LOG.info("Read completed tasks from history "
         + completedTasks.size());
   }
+  
+  protected Dispatcher createRecoveryDispatcher() {
+    return new RecoveryDispatcher();
+  }
+  
+  protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
+    return new RecoveryDispatcher(exitOnException);
+  }
 
+  @SuppressWarnings("rawtypes")
   class RecoveryDispatcher extends AsyncDispatcher {
     private final EventHandler actualHandler;
     private final EventHandler handler;
 
-    RecoveryDispatcher() {
+    RecoveryDispatcher(boolean exitOnException) {
+      super(exitOnException);
       actualHandler = super.getEventHandler();
       handler = new InterceptingEventHandler(actualHandler);
     }
 
+    RecoveryDispatcher() {
+      this(false);
+    }
+
     @Override
+    @SuppressWarnings("unchecked")
     public void dispatch(Event event) {
       if (recoveryMode) {
         if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
@@ -267,6 +278,10 @@ public class RecoveryService extends Com
           }
         }
       }
+      realDispatch(event);
+    }
+    
+    public void realDispatch(Event event) {
       super.dispatch(event);
     }
 
@@ -281,6 +296,7 @@ public class RecoveryService extends Com
     return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
   }
 
+  @SuppressWarnings({"rawtypes", "unchecked"})
   private class InterceptingEventHandler implements EventHandler {
     EventHandler actualHandler;
 
@@ -407,7 +423,9 @@ public class RecoveryService extends Com
       LOG.info("Sending assigned event to " + yarnAttemptID);
       ContainerId cId = attemptInfo.getContainerId();
 
-      NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname());
+      NodeId nodeId =
+          ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+              + attemptInfo.getPort());
       // Resource/Priority/ApplicationACLs are only needed while launching the
       // container on an NM, these are already completed tasks, so setting them
       // to null

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Thu Dec  1 08:35:20 2011
@@ -52,7 +52,12 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
+import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
@@ -408,6 +413,13 @@ public class TestRecovery {
     }
 
     @Override
+    protected Recovery createRecoveryService(AppContext appContext) {
+      return new RecoveryServiceWithCustomDispatcher(
+          appContext.getApplicationAttemptId(), appContext.getClock(),
+          getCommitter());
+    }
+
+    @Override
     protected ContainerLauncher createContainerLauncher(AppContext context) {
       MockContainerLauncher launcher = new MockContainerLauncher();
       launcher.shufflePort = 5467;
@@ -422,7 +434,22 @@ public class TestRecovery {
       return eventHandler;
     }
   }
-  
+
+  class RecoveryServiceWithCustomDispatcher extends RecoveryService {
+
+    public RecoveryServiceWithCustomDispatcher(
+        ApplicationAttemptId applicationAttemptId, Clock clock,
+        OutputCommitter committer) {
+      super(applicationAttemptId, clock, committer);
+    }
+
+    @Override
+    public Dispatcher createRecoveryDispatcher() {
+      return super.createRecoveryDispatcher(false);
+    }
+
+  }
+
   public static void main(String[] arg) throws Exception {
     TestRecovery test = new TestRecovery();
     test.testCrashed();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java Thu Dec  1 08:35:20 2011
@@ -45,18 +45,25 @@ public class AsyncDispatcher extends Abs
 
   private Thread eventHandlingThread;
   protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
+  private boolean exitOnDispatchException;
 
   public AsyncDispatcher() {
     this(new HashMap<Class<? extends Enum>, EventHandler>(),
-         new LinkedBlockingQueue<Event>());
+         new LinkedBlockingQueue<Event>(), true);
+  }
+  
+  public AsyncDispatcher(boolean exitOnException) {
+    this(new HashMap<Class<? extends Enum>, EventHandler>(),
+         new LinkedBlockingQueue<Event>(), exitOnException);
   }
 
   AsyncDispatcher(
       Map<Class<? extends Enum>, EventHandler> eventDispatchers,
-      BlockingQueue<Event> eventQueue) {
+      BlockingQueue<Event> eventQueue, boolean exitOnException) {
     super("Dispatcher");
     this.eventQueue = eventQueue;
     this.eventDispatchers = eventDispatchers;
+    this.exitOnDispatchException = exitOnException;
   }
 
   Runnable createThread() {
@@ -118,7 +125,9 @@ public class AsyncDispatcher extends Abs
     catch (Throwable t) {
       //TODO Maybe log the state of the queue
       LOG.fatal("Error in dispatcher thread. Exiting..", t);
-      System.exit(-1);
+      if (exitOnDispatchException) {
+        System.exit(-1);
+      }
     }
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java?rev=1208994&r1=1208993&r2=1208994&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java Thu Dec  1 08:35:20 2011
@@ -36,7 +36,7 @@ public class DrainDispatcher extends Asy
   }
 
   private DrainDispatcher(BlockingQueue<Event> eventQueue) {
-    super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue);
+    super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue, true);
     this.queue = eventQueue;
   }