You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/02/02 00:33:59 UTC

svn commit: r1239402 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java...

Author: sseth
Date: Wed Feb  1 23:33:58 2012
New Revision: 1239402

URL: http://svn.apache.org/viewvc?rev=1239402&view=rev
Log:
MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files. (Contributed by Arun C Murthy)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Feb  1 23:33:58 2012
@@ -648,6 +648,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3427. Fix streaming unit tests broken after mavenization.
     (Hitesh Shah via acmurthy) 
 
+    MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files.
+    (Arun C Murthy via sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Feb  1 23:33:58 2012
@@ -191,6 +191,11 @@ public class RecoveryService extends Com
     in = fc.open(historyFile);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    if (parseException != null) {
+      LOG.info("Got an error parsing job-history file " + historyFile + 
+          ", ignoring incomplete events.", parseException);
+    }
     Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
         .getAllTasks();
     for (TaskInfo taskInfo : taskInfos.values()) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Wed Feb  1 23:33:58 2012
@@ -24,8 +24,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,9 +55,13 @@ import org.apache.hadoop.yarn.api.record
 @InterfaceStability.Unstable
 public class JobHistoryParser {
 
+  private static final Log LOG = LogFactory.getLog(JobHistoryParser.class);
+  
   private final FSDataInputStream in;
-  JobInfo info = null;
+  private JobInfo info = null;
 
+  private IOException parseException = null;
+  
   /**
    * Create a job history parser for the given history file using the 
    * given file system
@@ -91,30 +98,58 @@ public class JobHistoryParser {
    * The first invocation will populate the object, subsequent calls
    * will return the already parsed object. 
    * The input stream is closed on return 
+   * 
+   * This api ignores partial records and stops parsing on encountering one.
+   * {@link #getParseException()} can be used to fetch the exception, if any.
+   * 
    * @return The populated jobInfo object
    * @throws IOException
+   * @see #getParseException()
    */
   public synchronized JobInfo parse() throws IOException {
+    return parse(new EventReader(in)); 
+  }
+
+  /**
+   * Only used for unit tests.
+   */
+  @Private
+  public synchronized JobInfo parse(EventReader reader) throws IOException {
 
     if (info != null) {
       return info;
     }
 
-    EventReader reader = new EventReader(in);
+    info = new JobInfo();
 
+    int eventCtr = 0;
     HistoryEvent event;
-    info = new JobInfo();
     try {
       while ((event = reader.getNextEvent()) != null) {
         handleEvent(event);
-      }
+        ++eventCtr;
+      } 
+    } catch (IOException ioe) {
+      LOG.info("Caught exception parsing history file after " + eventCtr + 
+          " events", ioe);
+      parseException = ioe;
     } finally {
       in.close();
     }
     return info;
   }
   
-  private void handleEvent(HistoryEvent event) throws IOException { 
+  /**
+   * Get the parse exception, if any.
+   * 
+   * @return the parse exception, if any
+   * @see #parse()
+   */
+  public synchronized IOException getParseException() {
+    return parseException;
+  }
+  
+  private void handleEvent(HistoryEvent event)  { 
     EventType type = event.getEventType();
 
     switch (type) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Wed Feb  1 23:33:58 2012
@@ -249,8 +249,9 @@ public class CompletedJob implements org
     }
     
     if (historyFileAbsolute != null) {
+      JobHistoryParser parser = null;
       try {
-        JobHistoryParser parser =
+        parser =
             new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
                 historyFileAbsolute);
         jobInfo = parser.parse();
@@ -258,6 +259,12 @@ public class CompletedJob implements org
         throw new YarnException("Could not load history file "
             + historyFileAbsolute, e);
       }
+      IOException parseException = parser.getParseException(); 
+      if (parseException != null) {
+        throw new YarnException(
+            "Could not parse history file " + historyFileAbsolute, 
+            parseException);
+      }
     } else {
       throw new IOException("History file not found");
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Wed Feb  1 23:33:58 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -37,14 +38,18 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -61,6 +66,9 @@ import org.apache.hadoop.yarn.service.Se
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestJobHistoryParsing {
   private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
@@ -76,6 +84,17 @@ public class TestJobHistoryParsing {
 
   @Test
   public void testHistoryParsing() throws Exception {
+    checkHistoryParsing(2, 1, 2);
+  }
+  
+  @Test
+  public void testHistoryParsingWithParseErrors() throws Exception {
+    checkHistoryParsing(3, 0, 2);
+  }
+  
+  private void checkHistoryParsing(final int numMaps, final int numReduces,
+      final int numSuccessfulMaps) 
+  throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
     long amStartTimeEst = System.currentTimeMillis();
@@ -83,8 +102,9 @@ public class TestJobHistoryParsing {
         CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
         MyResolver.class, DNSToSwitchMapping.class);
     RackResolver.init(conf);
-    MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
-        true);
+    MRApp app = 
+        new MRAppWithHistory(numMaps, numReduces, true, 
+            this.getClass().getName(), true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
     JobId jobId = job.getID();
@@ -117,8 +137,42 @@ public class TestJobHistoryParsing {
     }
 
     JobHistoryParser parser = new JobHistoryParser(in);
-    JobInfo jobInfo = parser.parse();
-
+    final EventReader realReader = new EventReader(in);
+    EventReader reader = Mockito.mock(EventReader.class);
+    if (numMaps == numSuccessfulMaps) {
+      reader = realReader;
+    } else {
+      final AtomicInteger numFinishedEvents = new AtomicInteger(0);  // Hack!
+      Mockito.when(reader.getNextEvent()).thenAnswer(
+          new Answer<HistoryEvent>() {
+            public HistoryEvent answer(InvocationOnMock invocation) 
+                throws IOException {
+              HistoryEvent event = realReader.getNextEvent();
+              if (event instanceof TaskFinishedEvent) {
+                numFinishedEvents.incrementAndGet();
+              }
+              
+              if (numFinishedEvents.get() <= numSuccessfulMaps) {
+                return event;
+              } else {
+                throw new IOException("test");
+              }
+            }
+          }
+        );
+    }
+    
+    JobInfo jobInfo = parser.parse(reader);
+    
+    long numFinishedMaps = 
+        computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
+    
+    if (numFinishedMaps != numMaps) {
+      Exception parseException = parser.getParseException();
+      Assert.assertNotNull("Didn't get expected parse exception", 
+          parseException);
+    }
+    
     Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
         jobInfo.getUsername());
     Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
@@ -126,14 +180,16 @@ public class TestJobHistoryParsing {
         jobInfo.getJobQueueName());
     Assert
         .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
-    Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps());
-    Assert.assertEquals("incorrect finishedReduces ", 1,
+    Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, 
+        numFinishedMaps);
+    Assert.assertEquals("incorrect finishedReduces ", numReduces,
         jobInfo.getFinishedReduces());
     Assert.assertEquals("incorrect uberized ", job.isUber(),
         jobInfo.getUberized());
     Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
     int totalTasks = allTasks.size();
-    Assert.assertEquals("total number of tasks is incorrect  ", 3, totalTasks);
+    Assert.assertEquals("total number of tasks is incorrect  ", 
+        (numMaps+numReduces), totalTasks);
 
     // Verify aminfo
     Assert.assertEquals(1, jobInfo.getAMInfos().size());
@@ -172,55 +228,78 @@ public class TestJobHistoryParsing {
         Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
         Assert.assertEquals("Incorrect shuffle port for task attempt",
             taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
-        Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
-        Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
-
-        // Verify rack-name
-        Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
-            .getRackname(), RACK_NAME);
+        if (numMaps == numSuccessfulMaps) {
+          Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
+          Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
+          
+          // Verify rack-name
+          Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
+              .getRackname(), RACK_NAME);
+        }
       }
     }
 
-    String summaryFileName = JobHistoryUtils
-        .getIntermediateSummaryFileName(jobId);
-    Path summaryFile = new Path(jobhistoryDir, summaryFileName);
-    String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
-    Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
-    Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
-    Assert.assertNotNull(jobSummaryString);
-
-    Map<String, String> jobSummaryElements = new HashMap<String, String>();
-    StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
-    while (strToken.hasMoreTokens()) {
-      String keypair = strToken.nextToken();
-      jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
+    if (numMaps == numSuccessfulMaps) {
 
-    }
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobId);
+      Path summaryFile = new Path(jobhistoryDir, summaryFileName);
+      String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
+      Assert.assertNotNull(jobSummaryString);
+
+      Map<String, String> jobSummaryElements = new HashMap<String, String>();
+      StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
+      while (strToken.hasMoreTokens()) {
+        String keypair = strToken.nextToken();
+        jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
 
-    Assert.assertEquals("JobId does not match", jobId.toString(),
-        jobSummaryElements.get("jobId"));
-    Assert.assertTrue("submitTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
-    Assert.assertTrue("launchTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
-    Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
-    Assert
-        .assertTrue(
-            "firstReduceTaskLaunchTime should not be 0",
-            Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
-    Assert.assertTrue("finishTime should not be 0",
-        Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
-    Assert.assertEquals("Mismatch in num map slots", 2,
-        Integer.parseInt(jobSummaryElements.get("numMaps")));
-    Assert.assertEquals("Mismatch in num reduce slots", 1,
-        Integer.parseInt(jobSummaryElements.get("numReduces")));
-    Assert.assertEquals("User does not match", System.getProperty("user.name"),
-        jobSummaryElements.get("user"));
-    Assert.assertEquals("Queue does not match", "default",
-        jobSummaryElements.get("queue"));
-    Assert.assertEquals("Status does not match", "SUCCEEDED",
-        jobSummaryElements.get("status"));
+      }
+
+      Assert.assertEquals("JobId does not match", jobId.toString(),
+          jobSummaryElements.get("jobId"));
+      Assert.assertTrue("submitTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
+      Assert.assertTrue("launchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
+      Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+      Assert
+      .assertTrue(
+          "firstReduceTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+      Assert.assertTrue("finishTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
+      Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
+          Integer.parseInt(jobSummaryElements.get("numMaps")));
+      Assert.assertEquals("Mismatch in num reduce slots", numReduces,
+          Integer.parseInt(jobSummaryElements.get("numReduces")));
+      Assert.assertEquals("User does not match", System.getProperty("user.name"),
+          jobSummaryElements.get("user"));
+      Assert.assertEquals("Queue does not match", "default",
+          jobSummaryElements.get("queue"));
+      Assert.assertEquals("Status does not match", "SUCCEEDED",
+          jobSummaryElements.get("status"));
+    }
+  }
+  
+  // Computes finished maps similar to RecoveryService...
+  private long computeFinishedMaps(JobInfo jobInfo, 
+      int numMaps, int numSuccessfulMaps) {
+    if (numMaps == numSuccessfulMaps) {
+      return jobInfo.getFinishedMaps();
+    }
+    
+    long numFinishedMaps = 0;
+    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = 
+        jobInfo.getAllTasks();
+    for (TaskInfo taskInfo : taskInfos.values()) {
+      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        ++numFinishedMaps;
+      }
+    }
+    return numFinishedMaps;
   }
   
   @Test
@@ -264,6 +343,9 @@ public class TestJobHistoryParsing {
 
     JobHistoryParser parser = new JobHistoryParser(in);
     JobInfo jobInfo = parser.parse();
+    Exception parseException = parser.getParseException();
+    Assert.assertNull("Caught an expected exception " + parseException, 
+        parseException);
     int noOffailedAttempts = 0;
     Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
     for (Task task : job.getTasks().values()) {