You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/03/04 06:25:59 UTC

svn commit: r1296749 - in /hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...

Author: vinodkv
Date: Sun Mar  4 05:25:59 2012
New Revision: 1296749

URL: http://svn.apache.org/viewvc?rev=1296749&view=rev
Log:
MAPREDUCE-3614. Fixed MR AM to close history file quickly and send a correct final state to the RM when it is killed. Contributed by Ravi Prakash.
svn merge --ignore-ancestry -c 1296747 ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt Sun Mar  4 05:25:59 2012
@@ -26,6 +26,9 @@ Release 0.23.2 - UNRELEASED
     MAPREDUCE-2793. Corrected AppIDs, JobIDs, TaskAttemptIDs to be of correct
     format on the web pages. (Bikas Saha via vinodkv)
 
+    MAPREDUCE-3614. Fixed MR AM to close history file quickly and send a correct
+    final state to the RM when it is killed. (Ravi Prakash via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and

Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Sun Mar  4 05:25:59 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -95,9 +96,12 @@ public class JobHistoryEventHandler exte
   private static final Log LOG = LogFactory.getLog(
       JobHistoryEventHandler.class);
 
-  private static final Map<JobId, MetaInfo> fileMap =
+  protected static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
 
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
     this.context = context;
@@ -314,7 +318,30 @@ public class JobHistoryEventHandler exte
       LOG.info("In stop, writing event " + ev.getType());
       handleEvent(ev);
     }
-    
+
+    // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
+    // closed their event writers
+    Iterator<JobId> jobIt = fileMap.keySet().iterator();
+    if(isSignalled) {
+      while (jobIt.hasNext()) {
+        JobId toClose = jobIt.next();
+        MetaInfo mi = fileMap.get(toClose);
+        if(mi != null && mi.isWriterActive()) {
+          LOG.warn("Found jobId " + toClose
+            + " to have not been closed. Will close");
+          //Create a JobFinishEvent so that it is written to the job history
+          JobUnsuccessfulCompletionEvent jucEvent =
+            new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
+              System.currentTimeMillis(), context.getJob(toClose)
+              .getCompletedMaps(), context.getJob(toClose).getCompletedReduces(),
+              JobState.KILLED.toString());
+          JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
+          //Bypass the queue mechanism which might wait. Call the method directly
+          handleEvent(jfEvent);
+        }
+      }
+    }
+
     //close all file handles
     for (MetaInfo mi : fileMap.values()) {
       try {
@@ -710,7 +737,7 @@ public class JobHistoryEventHandler exte
     }
   }
 
-  private class MetaInfo {
+  protected class MetaInfo {
     private Path historyFile;
     private Path confFile;
     private EventWriter writer;
@@ -880,4 +907,10 @@ public class JobHistoryEventHandler exte
     //TODO. Some error checking here.
     return tmpFileName.substring(0, tmpFileName.length()-4);
   }
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("JobHistoryEventHandler notified that isSignalled was "
+      + isSignalled);
+  }
 }

Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sun Mar  4 05:25:59 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
@@ -155,6 +156,7 @@ public class MRAppMaster extends Composi
   private boolean newApiCommitter;
   private OutputCommitter committer;
   private JobEventDispatcher jobEventDispatcher;
+  private JobHistoryEventHandler jobHistoryEventHandler;
   private boolean inRecovery = false;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
 
@@ -502,9 +504,9 @@ public class MRAppMaster extends Composi
 
   protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
       AppContext context) {
-    JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, 
-        getStartCount());
-    return eventHandler;
+    this.jobHistoryEventHandler = new JobHistoryEventHandler(context,
+      getStartCount());
+    return this.jobHistoryEventHandler;
   }
 
   protected Speculator createSpeculator(Configuration conf, AppContext context) {
@@ -659,6 +661,10 @@ public class MRAppMaster extends Composi
     public void handle(ContainerAllocatorEvent event) {
       this.containerAllocator.handle(event);
     }
+
+    public void setSignalled(boolean isSignalled) {
+      ((RMCommunicator) containerAllocator).setSignalled(true);
+    }
   }
 
   /**
@@ -957,12 +963,16 @@ public class MRAppMaster extends Composi
               Integer.parseInt(nodePortString),
               Integer.parseInt(nodeHttpPortString), appSubmitTime);
       Runtime.getRuntime().addShutdownHook(
-          new CompositeServiceShutdownHook(appMaster));
+        new MRAppMasterShutdownHook(appMaster));
       YarnConfiguration conf = new YarnConfiguration(new JobConf());
       conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
       String jobUserName = System
           .getenv(ApplicationConstants.Environment.USER.name());
       conf.set(MRJobConfig.USER_NAME, jobUserName);
+      // Do not automatically close FileSystem objects so that in case of
+      // SIGTERM I have a chance to write out the job history. I'll be closing
+      // the objects myself.
+      conf.setBoolean("fs.automatic.close", false);
       initAndStartAppMaster(appMaster, conf, jobUserName);
     } catch (Throwable t) {
       LOG.fatal("Error starting MRAppMaster", t);
@@ -970,6 +980,35 @@ public class MRAppMaster extends Composi
     }
   }
 
+  // The shutdown hook that runs when a signal is received AND during normal
+  // close of the JVM.
+  static class MRAppMasterShutdownHook extends Thread {
+    MRAppMaster appMaster;
+    MRAppMasterShutdownHook(MRAppMaster appMaster) {
+      this.appMaster = appMaster;
+    }
+    public void run() {
+      LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+        + "JobHistoryEventHandler.");
+      // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
+      // that they don't take too long in shutting down
+      if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
+        ((ContainerAllocatorRouter) appMaster.containerAllocator)
+        .setSignalled(true);
+      }
+      if(appMaster.jobHistoryEventHandler != null) {
+        appMaster.jobHistoryEventHandler.setSignalled(true);
+      }
+      appMaster.stop();
+      try {
+        //Close all the FileSystem objects
+        FileSystem.closeAll();
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close all FileSystem objects", ioe);
+      }
+    }
+  }
+
   protected static void initAndStartAppMaster(final MRAppMaster appMaster,
       final YarnConfiguration conf, String jobUserName) throws IOException,
       InterruptedException {

Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Sun Mar  4 05:25:59 2012
@@ -81,6 +81,8 @@ public abstract class RMCommunicator ext
 
   private final AppContext context;
   private Job job;
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
 
   public RMCommunicator(ClientService clientService, AppContext context) {
     super("RMCommunicator");
@@ -158,7 +160,8 @@ public abstract class RMCommunicator ext
       FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
       if (job.getState() == JobState.SUCCEEDED) {
         finishState = FinalApplicationStatus.SUCCEEDED;
-      } else if (job.getState() == JobState.KILLED) {
+      } else if (job.getState() == JobState.KILLED
+          || (job.getState() == JobState.RUNNING && isSignalled)) {
         finishState = FinalApplicationStatus.KILLED;
       } else if (job.getState() == JobState.FAILED
           || job.getState() == JobState.ERROR) {
@@ -278,4 +281,9 @@ public abstract class RMCommunicator ext
   }
 
   protected abstract void heartbeat() throws Exception;
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+  }
 }

Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Sun Mar  4 05:25:59 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -48,6 +50,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.verification.VerificationMode;
 
 public class TestJobHistoryEventHandler {
 
@@ -277,6 +281,68 @@ public class TestJobHistoryEventHandler 
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     AppContext mockAppContext = mockAppContext(jobId);
   }
+
+  private JobHistoryEvent getEventToEnqueue(JobId jobId) {
+    JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class);
+    HistoryEvent he = Mockito.mock(HistoryEvent.class);
+    Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED);
+    Mockito.when(toReturn.getHistoryEvent()).thenReturn(he);
+    Mockito.when(toReturn.getJobID()).thenReturn(jobId);
+    return toReturn;
+  }
+
+  @Test
+  /**
+   * Tests that in case of SIGTERM, the JHEH stops without processing its event
+   * queue (because we must stop quickly lest we get SIGKILLed) and processes
+   * a JobUnsuccessfulEvent for jobs which were still running (so that they may
+   * show up in the JobHistoryServer)
+   */
+  public void testSigTermedFunctionality() throws IOException {
+    AppContext mockedContext = Mockito.mock(AppContext.class);
+    JHEventHandlerForSigtermTest jheh =
+      new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+    JobId jobId = Mockito.mock(JobId.class);
+    jheh.addToFileMap(jobId);
+
+    //Submit 4 events and check that they're handled in the absence of a signal
+    final int numEvents = 4;
+    JobHistoryEvent events[] = new JobHistoryEvent[numEvents];
+    for(int i=0; i < numEvents; ++i) {
+      events[i] = getEventToEnqueue(jobId);
+      jheh.handle(events[i]);
+    }
+    jheh.stop();
+    //Make sure events were handled
+    assertTrue("handleEvent should've been called only 4 times but was "
+      + jheh.eventsHandled, jheh.eventsHandled == 4);
+
+    //Create a new jheh because the last stop closed the eventWriter etc.
+    jheh = new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+    // Make constructor of JobUnsuccessfulCompletionEvent pass
+    Job job = Mockito.mock(Job.class);
+    Mockito.when(mockedContext.getJob(jobId)).thenReturn(job);
+    // Make TypeConverter(JobID) pass
+    ApplicationId mockAppId = Mockito.mock(ApplicationId.class);
+    Mockito.when(mockAppId.getClusterTimestamp()).thenReturn(1000l);
+    Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
+
+    jheh.addToFileMap(jobId);
+    jheh.setSignalled(true);
+    for(int i=0; i < numEvents; ++i) {
+      events[i] = getEventToEnqueue(jobId);
+      jheh.handle(events[i]);
+    }
+    jheh.stop();
+    //Make sure events were handled, 4 + 1 finish event
+    assertTrue("handleEvent should've been called only 5 times but was "
+        + jheh.eventsHandled, jheh.eventsHandled == 5);
+    assertTrue("Last event handled wasn't JobUnsuccessfulCompletionEvent",
+        jheh.lastEventHandled.getHistoryEvent()
+        instanceof JobUnsuccessfulCompletionEvent);
+  }
 }
 
 class JHEvenHandlerForTest extends JobHistoryEventHandler {
@@ -307,4 +373,28 @@ class JHEvenHandlerForTest extends JobHi
   public EventWriter getEventWriter() {
     return this.eventWriter;
   }
+}
+
+/**
+ * Class to help with testSigTermedFunctionality
+ */
+class JHEventHandlerForSigtermTest extends JobHistoryEventHandler {
+  private MetaInfo metaInfo;
+  public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
+    super(context, startCount);
+  }
+
+  public void addToFileMap(JobId jobId) {
+    metaInfo = Mockito.mock(MetaInfo.class);
+    Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
+    fileMap.put(jobId, metaInfo);
+  }
+
+  JobHistoryEvent lastEventHandled;
+  int eventsHandled = 0;
+  @Override
+  protected void handleEvent(JobHistoryEvent event) {
+    this.lastEventHandled = event;
+    this.eventsHandled++;
+  }
 }
\ No newline at end of file