You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/28 01:32:19 UTC

svn commit: r1294417 [1/3] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/ hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java...

Author: vinodkv
Date: Tue Feb 28 00:32:19 2012
New Revision: 1294417

URL: http://svn.apache.org/viewvc?rev=1294417&view=rev
Log:
MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and task reports so as to improve UI response times. Contributed by Siddarth Seth.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Feb 28 00:32:19 2012
@@ -138,6 +138,9 @@ Release 0.23.2 - UNRELEASED
 
   OPTIMIZATIONS
 
+    MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and
+    task reports so as to improve UI response times. (Siddarth Seth via vinodkv)
+
   BUG FIXES
     MAPREDUCE-3918  proc_historyserver no longer in command line arguments for
     HistoryServer (Jon Eagles via bobby)

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Tue Feb 28 00:32:19 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 public class MRBuilderUtils {
@@ -41,6 +42,11 @@ public class MRBuilderUtils {
     return jobId;
   }
 
+  public static JobId newJobId(long clusterTs, int appIdInt, int id) {
+    ApplicationId appId = BuilderUtils.newApplicationId(clusterTs, appIdInt);
+    return MRBuilderUtils.newJobId(appId, id);
+  }
+
   public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
     TaskId taskId = Records.newRecord(TaskId.class);
     taskId.setJobId(jobId);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue Feb 28 00:32:19 2012
@@ -19,13 +19,16 @@
 package org.apache.hadoop.mapreduce.v2.hs;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -54,7 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ut
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
 
 
 /**
@@ -64,50 +68,31 @@ import org.apache.hadoop.yarn.factory.pr
 public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
   
   static final Log LOG = LogFactory.getLog(CompletedJob.class);
-  private final Counters counters;
   private final Configuration conf;
-  private final JobId jobId;
-  private final List<String> diagnostics = new ArrayList<String>();
-  private final JobReport report;
-  private final Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
-  private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
-  private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
-  private final String user;
+  private final JobId jobId; //Can be picked from JobInfo with a conversion.
+  private final String user; //Can be picked up from JobInfo
   private final Path confFile;
-  private JobACLsManager aclsMgr;
-  private List<TaskAttemptCompletionEvent> completionEvents = null;
   private JobInfo jobInfo;
-
+  private JobReport report;
+  AtomicBoolean tasksLoaded = new AtomicBoolean(false);
+  private Lock tasksLock = new ReentrantLock();
+  private Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+  private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
+  private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
+  private List<TaskAttemptCompletionEvent> completionEvents = null;
+  private JobACLsManager aclsMgr;
+  
+  
   public CompletedJob(Configuration conf, JobId jobId, Path historyFile, 
       boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr) 
           throws IOException {
     LOG.info("Loading job: " + jobId + " from file: " + historyFile);
     this.conf = conf;
     this.jobId = jobId;
+    this.user = userName;
     this.confFile = confFile;
     this.aclsMgr = aclsMgr;
-    
     loadFullHistoryData(loadTasks, historyFile);
-    user = userName;
-    counters = jobInfo.getTotalCounters();
-    diagnostics.add(jobInfo.getErrorInfo());
-    report =
-        RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
-            JobReport.class);
-    report.setJobId(jobId);
-    report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
-    report.setSubmitTime(jobInfo.getSubmitTime());
-    report.setStartTime(jobInfo.getLaunchTime());
-    report.setFinishTime(jobInfo.getFinishTime());
-    report.setJobName(jobInfo.getJobname());
-    report.setUser(jobInfo.getUsername());
-    report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
-    report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
-    report.setJobFile(confFile.toString());
-    report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
-        .toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
-    report.setAMInfos(getAMInfos());
-    report.setIsUber(isUber());
   }
 
   @Override
@@ -122,7 +107,7 @@ public class CompletedJob implements org
 
   @Override
   public Counters getAllCounters() {
-    return counters;
+    return jobInfo.getTotalCounters();
   }
 
   @Override
@@ -131,10 +116,36 @@ public class CompletedJob implements org
   }
 
   @Override
-  public JobReport getReport() {
+  public synchronized JobReport getReport() {
+    if (report == null) {
+      constructJobReport();
+    }
     return report;
   }
 
+  private void constructJobReport() {
+    report = Records.newRecord(JobReport.class);
+    report.setJobId(jobId);
+    report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
+    report.setSubmitTime(jobInfo.getSubmitTime());
+    report.setStartTime(jobInfo.getLaunchTime());
+    report.setFinishTime(jobInfo.getFinishTime());
+    report.setJobName(jobInfo.getJobname());
+    report.setUser(jobInfo.getUsername());
+    report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
+    report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
+    report.setJobFile(confFile.toString());
+    String historyUrl = "N/A";
+    try {
+      historyUrl = JobHistoryUtils.getHistoryUrl(conf, jobId.getAppId());
+    } catch (UnknownHostException e) {
+      //Ignore.
+    }
+    report.setTrackingUrl(historyUrl);
+    report.setAMInfos(getAMInfos());
+    report.setIsUber(isUber());
+  }
+
   @Override
   public float getProgress() {
     return 1.0f;
@@ -142,16 +153,23 @@ public class CompletedJob implements org
 
   @Override
   public JobState getState() {
-    return report.getJobState();
+    return JobState.valueOf(jobInfo.getJobStatus());
   }
 
   @Override
   public Task getTask(TaskId taskId) {
-    return tasks.get(taskId);
+    if (tasksLoaded.get()) {
+      return tasks.get(taskId);
+    } else {
+      TaskID oldTaskId = TypeConverter.fromYarn(taskId);
+      CompletedTask completedTask =
+          new CompletedTask(taskId, jobInfo.getAllTasks().get(oldTaskId));
+      return completedTask;
+    }
   }
 
   @Override
-  public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+  public synchronized TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
       int fromEventId, int maxEvents) {
     if (completionEvents == null) {
       constructTaskAttemptCompletionEvents();
@@ -167,6 +185,7 @@ public class CompletedJob implements org
   }
 
   private void constructTaskAttemptCompletionEvents() {
+    loadAllTasks();
     completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
     List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
     for (TaskId taskId : tasks.keySet()) {
@@ -205,8 +224,8 @@ public class CompletedJob implements org
     int eventId = 0;
     for (TaskAttempt taskAttempt : allTaskAttempts) {
 
-      TaskAttemptCompletionEvent tace = RecordFactoryProvider.getRecordFactory(
-          null).newRecordInstance(TaskAttemptCompletionEvent.class);
+      TaskAttemptCompletionEvent tace =
+          Records.newRecord(TaskAttemptCompletionEvent.class);
 
       int attemptRunTime = -1;
       if (taskAttempt.getLaunchTime() != 0 && taskAttempt.getFinishTime() != 0) {
@@ -237,15 +256,42 @@ public class CompletedJob implements org
 
   @Override
   public Map<TaskId, Task> getTasks() {
+    loadAllTasks();
     return tasks;
   }
 
+  private void loadAllTasks() {
+    if (tasksLoaded.get()) {
+      return;
+    }
+    tasksLock.lock();
+    try {
+      if (tasksLoaded.get()) {
+        return;
+      }
+      for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
+        TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
+        TaskInfo taskInfo = entry.getValue();
+        Task task = new CompletedTask(yarnTaskID, taskInfo);
+        tasks.put(yarnTaskID, task);
+        if (task.getType() == TaskType.MAP) {
+          mapTasks.put(task.getID(), task);
+        } else if (task.getType() == TaskType.REDUCE) {
+          reduceTasks.put(task.getID(), task);
+        }
+      }
+      tasksLoaded.set(true);
+    } finally {
+      tasksLock.unlock();
+    }
+  }
+
   //History data is leisurely loaded when task level data is requested
   private synchronized void loadFullHistoryData(boolean loadTasks,
       Path historyFileAbsolute) throws IOException {
     LOG.info("Loading history file: [" + historyFileAbsolute + "]");
-    if (jobInfo != null) {
-      return; //data already loaded
+    if (this.jobInfo != null) {
+      return;
     }
     
     if (historyFileAbsolute != null) {
@@ -254,7 +300,7 @@ public class CompletedJob implements org
         parser =
             new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
                 historyFileAbsolute);
-        jobInfo = parser.parse();
+        this.jobInfo = parser.parse();
       } catch (IOException e) {
         throw new YarnException("Could not load history file "
             + historyFileAbsolute, e);
@@ -268,27 +314,15 @@ public class CompletedJob implements org
     } else {
       throw new IOException("History file not found");
     }
-    
     if (loadTasks) {
-    for (Map.Entry<org.apache.hadoop.mapreduce.TaskID, TaskInfo> entry : jobInfo
-        .getAllTasks().entrySet()) {
-      TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
-      TaskInfo taskInfo = entry.getValue();
-      Task task = new CompletedTask(yarnTaskID, taskInfo);
-      tasks.put(yarnTaskID, task);
-      if (task.getType() == TaskType.MAP) {
-        mapTasks.put(task.getID(), task);
-      } else if (task.getType() == TaskType.REDUCE) {
-        reduceTasks.put(task.getID(), task);
-      }
-    }
-    }
-    LOG.info("TaskInfo loaded");
+      loadAllTasks();
+      LOG.info("TaskInfo loaded");
+    }    
   }
 
   @Override
   public List<String> getDiagnostics() {
-    return diagnostics;
+    return Collections.singletonList(jobInfo.getErrorInfo());
   }
 
   @Override
@@ -318,6 +352,7 @@ public class CompletedJob implements org
 
   @Override
   public Map<TaskId, Task> getTasks(TaskType taskType) {
+    loadAllTasks();
     if (TaskType.MAP.equals(taskType)) {
       return mapTasks;
     } else {//we have only two types of tasks

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java Tue Feb 28 00:32:19 2012
@@ -20,10 +20,13 @@ package org.apache.hadoop.mapreduce.v2.h
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -35,59 +38,24 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
 
 public class CompletedTask implements Task {
 
-
-  private final TaskType type;
-  private Counters counters;
-  private final long startTime;
-  private final long finishTime;
-  private TaskState state;
   private final TaskId taskId;
-  private final TaskReport report;
+  private final TaskInfo taskInfo;
+  private TaskReport report;
+  private TaskAttemptId successfulAttempt;
+  private List<String> reportDiagnostics = new LinkedList<String>();
+  private Lock taskAttemptsLock = new ReentrantLock();
+  private AtomicBoolean taskAttemptsLoaded = new AtomicBoolean(false);
   private final Map<TaskAttemptId, TaskAttempt> attempts =
     new LinkedHashMap<TaskAttemptId, TaskAttempt>();
-  
-  private static final Log LOG = LogFactory.getLog(CompletedTask.class);
 
   CompletedTask(TaskId taskId, TaskInfo taskInfo) {
     //TODO JobHistoryParser.handleTaskFailedAttempt should use state from the event.
-    LOG.debug("HandlingTaskId: [" + taskId + "]");
+    this.taskInfo = taskInfo;
     this.taskId = taskId;
-    this.startTime = taskInfo.getStartTime();
-    this.finishTime = taskInfo.getFinishTime();
-    this.type = TypeConverter.toYarn(taskInfo.getTaskType());
-    if (taskInfo.getCounters() != null)
-      this.counters = taskInfo.getCounters();
-    if (taskInfo.getTaskStatus() != null) {
-      this.state = TaskState.valueOf(taskInfo.getTaskStatus());
-    } else {
-      this.state = TaskState.KILLED;
-    }
-    report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskReport.class);
-    for (TaskAttemptInfo attemptHistory : taskInfo.getAllTaskAttempts()
-        .values()) {
-      CompletedTaskAttempt attempt = new CompletedTaskAttempt(taskId, 
-          attemptHistory);
-      report.addAllDiagnostics(attempt.getDiagnostics()); //TODO TMI?
-      attempts.put(attempt.getID(), attempt);
-      if (attemptHistory.getTaskStatus() != null
-          && attemptHistory.getTaskStatus().equals(
-              TaskState.SUCCEEDED.toString())
-          && report.getSuccessfulAttempt() == null) {
-        report.setSuccessfulAttempt(TypeConverter.toYarn(attemptHistory
-            .getAttemptId()));
-      }
-    }
-    report.setTaskId(taskId);
-    report.setStartTime(startTime);
-    report.setFinishTime(finishTime);
-    report.setTaskState(state);
-    report.setProgress(getProgress());
-    report.setCounters(TypeConverter.toYarn(getCounters()));
-    report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
   }
 
   @Override
@@ -97,17 +65,19 @@ public class CompletedTask implements Ta
 
   @Override
   public TaskAttempt getAttempt(TaskAttemptId attemptID) {
+    loadAllTaskAttempts();
     return attempts.get(attemptID);
   }
 
   @Override
   public Map<TaskAttemptId, TaskAttempt> getAttempts() {
+    loadAllTaskAttempts();
     return attempts;
   }
 
   @Override
   public Counters getCounters() {
-    return counters;
+    return taskInfo.getCounters();
   }
 
   @Override
@@ -121,13 +91,18 @@ public class CompletedTask implements Ta
   }
 
   @Override
-  public TaskReport getReport() {
+  public synchronized TaskReport getReport() {
+    if (report == null) {
+      constructTaskReport();
+    }
     return report;
   }
+  
 
+  
   @Override
   public TaskType getType() {
-    return type;
+    return TypeConverter.toYarn(taskInfo.getTaskType());
   }
 
   @Override
@@ -137,7 +112,54 @@ public class CompletedTask implements Ta
 
   @Override
   public TaskState getState() {
-    return state;
+    return taskInfo.getTaskStatus() == null ? TaskState.KILLED : TaskState
+        .valueOf(taskInfo.getTaskStatus());
   }
 
+  private void constructTaskReport() {
+    loadAllTaskAttempts();
+    this.report = Records.newRecord(TaskReport.class);
+    report.setTaskId(taskId);
+    report.setStartTime(taskInfo.getStartTime());
+    report.setFinishTime(taskInfo.getFinishTime());
+    report.setTaskState(getState());
+    report.setProgress(getProgress());
+    report.setCounters(TypeConverter.toYarn(getCounters()));
+    if (successfulAttempt != null) {
+      report.setSuccessfulAttempt(successfulAttempt);
+    }
+    report.addAllDiagnostics(reportDiagnostics);
+    report
+        .addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
+  }
+
+  private void loadAllTaskAttempts() {
+    if (taskAttemptsLoaded.get()) {
+      return;
+    }
+    taskAttemptsLock.lock();
+    try {
+      if (taskAttemptsLoaded.get()) {
+        return;
+      }
+
+      for (TaskAttemptInfo attemptHistory : taskInfo.getAllTaskAttempts()
+          .values()) {
+        CompletedTaskAttempt attempt =
+            new CompletedTaskAttempt(taskId, attemptHistory);
+        reportDiagnostics.addAll(attempt.getDiagnostics());
+        attempts.put(attempt.getID(), attempt);
+        if (successfulAttempt == null
+            && attemptHistory.getTaskStatus() != null
+            && attemptHistory.getTaskStatus().equals(
+                TaskState.SUCCEEDED.toString())) {
+          successfulAttempt =
+              TypeConverter.toYarn(attemptHistory.getAttemptId());
+        }
+      }
+      taskAttemptsLoaded.set(true);
+    } finally {
+      taskAttemptsLock.unlock();
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Tue Feb 28 00:32:19 2012
@@ -30,25 +30,21 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
 
 public class CompletedTaskAttempt implements TaskAttempt {
 
   private final TaskAttemptInfo attemptInfo;
   private final TaskAttemptId attemptId;
-  private Counters counters;
   private final TaskAttemptState state;
-  private final TaskAttemptReport report;
   private final List<String> diagnostics = new ArrayList<String>();
+  private TaskAttemptReport report;
 
   private String localDiagMessage;
 
   CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) {
     this.attemptInfo = attemptInfo;
     this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
-    if (attemptInfo.getCounters() != null) {
-      this.counters = attemptInfo.getCounters();
-    }
     if (attemptInfo.getTaskStatus() != null) {
       this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus());
     } else {
@@ -56,37 +52,9 @@ public class CompletedTaskAttempt implem
       localDiagMessage = "Attmpt state missing from History : marked as KILLED";
       diagnostics.add(localDiagMessage);
     }
-    
     if (attemptInfo.getError() != null) {
       diagnostics.add(attemptInfo.getError());
     }
-    
-    report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class);
-    
-    report.setTaskAttemptId(attemptId);
-    report.setTaskAttemptState(state);
-    report.setProgress(getProgress());
-    report.setStartTime(attemptInfo.getStartTime());
-    
-    report.setFinishTime(attemptInfo.getFinishTime());
-    report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
-    report.setSortFinishTime(attemptInfo.getSortFinishTime());
-    if (localDiagMessage != null) {
-      report.setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
-    } else {
-    report.setDiagnosticInfo(attemptInfo.getError());
-    }
-//    report.setPhase(attemptInfo.get); //TODO
-    report.setStateString(attemptInfo.getState());
-    report.setCounters(TypeConverter.toYarn(getCounters()));
-    report.setContainerId(attemptInfo.getContainerId());
-    if (attemptInfo.getHostname() == null) {
-      report.setNodeManagerHost("UNKNOWN");
-    } else {
-      report.setNodeManagerHost(attemptInfo.getHostname());
-      report.setNodeManagerPort(attemptInfo.getPort());
-    }
-    report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
   }
 
   @Override
@@ -111,7 +79,7 @@ public class CompletedTaskAttempt implem
 
   @Override
   public Counters getCounters() {
-    return counters;
+    return attemptInfo.getCounters();
   }
 
   @Override
@@ -125,7 +93,10 @@ public class CompletedTaskAttempt implem
   }
 
   @Override
-  public TaskAttemptReport getReport() {
+  public synchronized TaskAttemptReport getReport() {
+    if (report == null) {
+      constructTaskAttemptReport();
+    }
     return report;
   }
 
@@ -146,26 +117,55 @@ public class CompletedTaskAttempt implem
 
   @Override
   public long getLaunchTime() {
-    return report.getStartTime();
+    return attemptInfo.getStartTime();
   }
 
   @Override
   public long getFinishTime() {
-    return report.getFinishTime();
+    return attemptInfo.getFinishTime();
   }
   
   @Override
   public long getShuffleFinishTime() {
-    return report.getShuffleFinishTime();
+    return attemptInfo.getShuffleFinishTime();
   }
 
   @Override
   public long getSortFinishTime() {
-    return report.getSortFinishTime();
+    return attemptInfo.getSortFinishTime();
   }
 
   @Override
   public int getShufflePort() {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return attemptInfo.getShufflePort();
+  }
+
+  private void constructTaskAttemptReport() {
+    report = Records.newRecord(TaskAttemptReport.class);
+
+    report.setTaskAttemptId(attemptId);
+    report.setTaskAttemptState(state);
+    report.setProgress(getProgress());
+    report.setStartTime(attemptInfo.getStartTime());
+    report.setFinishTime(attemptInfo.getFinishTime());
+    report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
+    report.setSortFinishTime(attemptInfo.getSortFinishTime());
+    if (localDiagMessage != null) {
+      report
+          .setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
+    } else {
+      report.setDiagnosticInfo(attemptInfo.getError());
+    }
+    // report.setPhase(attemptInfo.get); //TODO
+    report.setStateString(attemptInfo.getState());
+    report.setCounters(TypeConverter.toYarn(getCounters()));
+    report.setContainerId(attemptInfo.getContainerId());
+    if (attemptInfo.getHostname() == null) {
+      report.setNodeManagerHost("UNKNOWN");
+    } else {
+      report.setNodeManagerHost(attemptInfo.getHostname());
+      report.setNodeManagerPort(attemptInfo.getPort());
+    }
+    report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
   }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue Feb 28 00:32:19 2012
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -117,9 +118,8 @@ public class JobHistory extends Abstract
   
   //Maintains a list of known done subdirectories. Not currently used.
   private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
-  
-  private final SortedMap<JobId, Job> loadedJobCache = 
-    new ConcurrentSkipListMap<JobId, Job>();
+
+  private Map<JobId, Job> loadedJobCache = null;
 
   /**
    * Maintains a mapping between intermediate user directories and the last 
@@ -167,6 +167,7 @@ public class JobHistory extends Abstract
    * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
    */
 
+  @SuppressWarnings("serial")
   @Override
   public void init(Configuration conf) throws YarnException {
     LOG.info("JobHistory Init");
@@ -224,6 +225,16 @@ public class JobHistory extends Abstract
             DEFAULT_MOVE_THREAD_INTERVAL);
     numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
         DEFAULT_MOVE_THREAD_COUNT);
+    
+    loadedJobCache =
+        Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
+            loadedJobCacheSize + 1, 0.75f, true) {
+          @Override
+          public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
+            return super.size() > loadedJobCacheSize;
+          }
+        });
+    
     try {
       initExisting();
     } catch (IOException e) {
@@ -465,9 +476,6 @@ public class JobHistory extends Abstract
       LOG.debug("Adding "+job.getID()+" to loaded job cache");
     }
     loadedJobCache.put(job.getID(), job);
-    if (loadedJobCache.size() > loadedJobCacheSize ) {
-      loadedJobCache.remove(loadedJobCache.firstKey());
-    }
   }
   
   
@@ -655,7 +663,7 @@ public class JobHistory extends Abstract
     synchronized(metaInfo) {
       try {
         Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), 
-            metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
+            metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
             metaInfo.getConfFile(), this.aclsMgr);
         addToLoadedJobCache(job);
         return job;
@@ -938,7 +946,7 @@ public class JobHistory extends Abstract
     LOG.debug("Called getAllJobs()");
     return getAllJobsInternal();
   }
-  
+
   static class MetaInfo {
     private Path historyFile;
     private Path confFile; 

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1294417&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Tue Feb 28 00:32:19 2012
@@ -0,0 +1,145 @@
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class TestJobHistoryEntities {
+
+  private final String historyFileName =
+      "job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist";
+  private final String confFileName = "job_1329348432655_0001_conf.xml";
+  private final Configuration conf = new Configuration();
+  private final JobACLsManager jobAclsManager = new JobACLsManager(conf);
+  private boolean loadTasks;
+  private JobId jobId = MRBuilderUtils.newJobId(1329348432655l, 1, 1);
+  Path fulleHistoryPath =
+    new Path(this.getClass().getClassLoader().getResource(historyFileName)
+        .getFile());
+  Path fullConfPath =
+    new Path(this.getClass().getClassLoader().getResource(confFileName)
+        .getFile());
+  private CompletedJob completedJob;
+
+  public TestJobHistoryEntities(boolean loadTasks) throws Exception {
+    this.loadTasks = loadTasks;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    List<Object[]> list = new ArrayList<Object[]>(2);
+    list.add(new Object[] { true });
+    list.add(new Object[] { false });
+    return list;
+  }
+
+  /* Verify some expected values based on the history file */
+  @Test
+  public void testCompletedJob() throws Exception {
+    //Re-initialize to verify the delayed load.
+    completedJob =
+      new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
+          fullConfPath, jobAclsManager);
+    //Verify tasks loaded based on loadTask parameter.
+    assertEquals(loadTasks, completedJob.tasksLoaded.get());
+    assertEquals(1, completedJob.getAMInfos().size());
+    assertEquals(10, completedJob.getCompletedMaps());
+    assertEquals(1, completedJob.getCompletedReduces());
+    assertEquals(11, completedJob.getTasks().size());
+    //Verify tasks loaded at this point.
+    assertEquals(true, completedJob.tasksLoaded.get());
+    assertEquals(10, completedJob.getTasks(TaskType.MAP).size());
+    assertEquals(1, completedJob.getTasks(TaskType.REDUCE).size());
+    assertEquals("user", completedJob.getUserName());
+    assertEquals(JobState.SUCCEEDED, completedJob.getState());
+    JobReport jobReport = completedJob.getReport();
+    assertEquals("user", jobReport.getUser());
+    assertEquals(JobState.SUCCEEDED, jobReport.getJobState());
+  }
+  
+  @Test
+  public void testCompletedTask() throws Exception {
+    completedJob =
+      new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
+          fullConfPath, jobAclsManager);
+    TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    
+    Map<TaskId, Task> mapTasks = completedJob.getTasks(TaskType.MAP);
+    Map<TaskId, Task> reduceTasks = completedJob.getTasks(TaskType.REDUCE);
+    assertEquals(10, mapTasks.size());
+    assertEquals(1, reduceTasks.size());
+    
+    Task mt1 = mapTasks.get(mt1Id);
+    assertEquals(1, mt1.getAttempts().size());
+    assertEquals(TaskState.SUCCEEDED, mt1.getState());
+    TaskReport mt1Report = mt1.getReport();
+    assertEquals(TaskState.SUCCEEDED, mt1Report.getTaskState());
+    assertEquals(mt1Id, mt1Report.getTaskId());
+    Task rt1 = reduceTasks.get(rt1Id);
+    assertEquals(1, rt1.getAttempts().size());
+    assertEquals(TaskState.SUCCEEDED, rt1.getState());
+    TaskReport rt1Report = rt1.getReport();
+    assertEquals(TaskState.SUCCEEDED, rt1Report.getTaskState());
+    assertEquals(rt1Id, rt1Report.getTaskId());
+  }
+  
+  @Test
+  public void testCompletedTaskAttempt() throws Exception {
+    completedJob =
+      new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
+          fullConfPath, jobAclsManager);
+    TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);
+    TaskAttemptId rta1Id = MRBuilderUtils.newTaskAttemptId(rt1Id, 0);
+    
+    Task mt1 = completedJob.getTask(mt1Id);
+    Task rt1 = completedJob.getTask(rt1Id);
+    
+    TaskAttempt mta1 = mt1.getAttempt(mta1Id);
+    assertEquals(TaskAttemptState.SUCCEEDED, mta1.getState());
+    assertEquals("localhost:45454", mta1.getAssignedContainerMgrAddress());
+    assertEquals("localhost:9999", mta1.getNodeHttpAddress());
+    TaskAttemptReport mta1Report = mta1.getReport();
+    assertEquals(TaskAttemptState.SUCCEEDED, mta1Report.getTaskAttemptState());
+    assertEquals("localhost", mta1Report.getNodeManagerHost());
+    assertEquals(45454, mta1Report.getNodeManagerPort());
+    assertEquals(9999, mta1Report.getNodeManagerHttpPort());
+    
+    TaskAttempt rta1 = rt1.getAttempt(rta1Id);
+    assertEquals(TaskAttemptState.SUCCEEDED, rta1.getState());
+    assertEquals("localhost:45454", rta1.getAssignedContainerMgrAddress());
+    assertEquals("localhost:9999", rta1.getNodeHttpAddress());
+    TaskAttemptReport rta1Report = rta1.getReport();
+    assertEquals(TaskAttemptState.SUCCEEDED, rta1Report.getTaskAttemptState());
+    assertEquals("localhost", rta1Report.getNodeManagerHost());
+    assertEquals(45454, rta1Report.getNodeManagerPort());
+    assertEquals(9999, rta1Report.getNodeManagerHttpPort());
+  }
+}