You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/11/02 00:07:56 UTC

svn commit: r1404825 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...

Author: jlowe
Date: Thu Nov  1 23:07:55 2012
New Revision: 1404825

URL: http://svn.apache.org/viewvc?rev=1404825&view=rev
Log:
svn merge -c 1404817 FIXES: MAPREDUCE-4729. job history UI not showing all job attempts. Contributed by Vinod Kumar Vavilapalli

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
      - copied unchanged from r1404817, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1404825&r1=1404824&r2=1404825&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Nov  1 23:07:55 2012
@@ -65,6 +65,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4746. The MR Application Master does not have a config to set
     environment variables (Rob Parker via bobby)
 
+    MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod
+    Kumar Vavilapalli via jlowe)
+ 
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1404825&r1=1404824&r2=1404825&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Nov  1 23:07:55 2012
@@ -23,14 +23,17 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -89,6 +95,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.SystemClock;
@@ -826,16 +833,21 @@ public class MRAppMaster extends Composi
   @Override
   public void start() {
 
+    amInfos = new LinkedList<AMInfo>();
+
     // Pull completedTasks etc from recovery
     if (inRecovery) {
       completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
       amInfos = recoveryServ.getAMInfos();
+    } else {
+      // Get the amInfos anyways irrespective of whether recovery is enabled or
+      // not IF this is not the first AM generation
+      if (appAttemptID.getAttemptId() != 1) {
+        amInfos.addAll(readJustAMInfos());
+      }
     }
 
-    // / Create the AMInfo for the current AppMaster
-    if (amInfos == null) {
-      amInfos = new LinkedList<AMInfo>();
-    }
+    // Current an AMInfo for the current AM generation.
     AMInfo amInfo =
         MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
             nmPort, nmHttpPort);
@@ -893,6 +905,51 @@ public class MRAppMaster extends Composi
     startJobs();
   }
 
+  private List<AMInfo> readJustAMInfos() {
+    List<AMInfo> amInfos = new ArrayList<AMInfo>();
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream =
+          RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
+            appAttemptID);
+      EventReader jobHistoryEventReader = new EventReader(inputStream);
+
+      // All AMInfos are contiguous. Track when the first AMStartedEvent
+      // appears.
+      boolean amStartedEventsBegan = false;
+
+      HistoryEvent event;
+      while ((event = jobHistoryEventReader.getNextEvent()) != null) {
+        if (event.getEventType() == EventType.AM_STARTED) {
+          if (!amStartedEventsBegan) {
+            // First AMStartedEvent.
+            amStartedEventsBegan = true;
+          }
+          AMStartedEvent amStartedEvent = (AMStartedEvent) event;
+          amInfos.add(MRBuilderUtils.newAMInfo(
+            amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
+            amStartedEvent.getContainerId(),
+            StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
+            amStartedEvent.getNodeManagerPort(),
+            amStartedEvent.getNodeManagerHttpPort()));
+        } else if (amStartedEventsBegan) {
+          // This means AMStartedEvents began and this event is a
+          // non-AMStarted event.
+          // No need to continue reading all the other events.
+          break;
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not parse the old history file. "
+          + "Will not have old AMinfos ", e);
+    } finally {
+      if (inputStream != null) {
+        IOUtils.closeQuietly(inputStream);
+      }
+    }
+    return amInfos;
+  }
+
   /**
    * This can be overridden to instantiate multiple jobs and create a 
    * workflow.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1404825&r1=1404824&r2=1404825&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Nov  1 23:07:55 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -178,26 +177,13 @@ public class RecoveryService extends Com
   }
 
   private void parse() throws IOException {
-    // TODO: parse history file based on startCount
-    String jobName = 
-        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
-    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
-    FSDataInputStream in = null;
-    Path historyFile = null;
-    Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
-        new Path(jobhistoryDir));
-    FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
-        getConfig());
-    //read the previous history file
-    historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
-        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));  
-    LOG.info("History file is at " + historyFile);
-    in = fc.open(historyFile);
+    FSDataInputStream in =
+        getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();
     Exception parseException = parser.getParseException();
     if (parseException != null) {
-      LOG.info("Got an error parsing job-history file " + historyFile + 
+      LOG.info("Got an error parsing job-history file" + 
           ", ignoring incomplete events.", parseException);
     }
     Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
@@ -213,6 +199,28 @@ public class RecoveryService extends Com
     LOG.info("Read completed tasks from history "
         + completedTasks.size());
   }
+
+  public static FSDataInputStream getPreviousJobHistoryFileStream(
+      Configuration conf, ApplicationAttemptId applicationAttemptId)
+      throws IOException {
+    FSDataInputStream in = null;
+    Path historyFile = null;
+    String jobName =
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+          .toString();
+    String jobhistoryDir =
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+    Path histDirPath =
+        FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+    // read the previous history file
+    historyFile =
+        fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
+          jobName, (applicationAttemptId.getAttemptId() - 1)));
+    LOG.info("History file is at " + historyFile);
+    in = fc.open(historyFile);
+    return in;
+  }
   
   protected Dispatcher createRecoveryDispatcher() {
     return new RecoveryDispatcher();