You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/28 01:32:19 UTC
svn commit: r1294417 [1/3] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/
hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java...
Author: vinodkv
Date: Tue Feb 28 00:32:19 2012
New Revision: 1294417
URL: http://svn.apache.org/viewvc?rev=1294417&view=rev
Log:
MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and task reports so as to improve UI response times. Contributed by Siddarth Seth.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Feb 28 00:32:19 2012
@@ -138,6 +138,9 @@ Release 0.23.2 - UNRELEASED
OPTIMIZATIONS
+ MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and
+ task reports so as to improve UI response times. (Siddarth Seth via vinodkv)
+
BUG FIXES
MAPREDUCE-3918 proc_historyserver no longer in command line arguments for
HistoryServer (Jon Eagles via bobby)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Tue Feb 28 00:32:19 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
public class MRBuilderUtils {
@@ -41,6 +42,11 @@ public class MRBuilderUtils {
return jobId;
}
+ public static JobId newJobId(long clusterTs, int appIdInt, int id) {
+ ApplicationId appId = BuilderUtils.newApplicationId(clusterTs, appIdInt);
+ return MRBuilderUtils.newJobId(appId, id);
+ }
+
public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
TaskId taskId = Records.newRecord(TaskId.class);
taskId.setJobId(jobId);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue Feb 28 00:32:19 2012
@@ -19,13 +19,16 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
-import java.util.ArrayList;
+import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -54,7 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ut
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
/**
@@ -64,50 +68,31 @@ import org.apache.hadoop.yarn.factory.pr
public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
static final Log LOG = LogFactory.getLog(CompletedJob.class);
- private final Counters counters;
private final Configuration conf;
- private final JobId jobId;
- private final List<String> diagnostics = new ArrayList<String>();
- private final JobReport report;
- private final Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
- private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
- private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
- private final String user;
+ private final JobId jobId; //Can be picked from JobInfo with a conversion.
+ private final String user; //Can be picked up from JobInfo
private final Path confFile;
- private JobACLsManager aclsMgr;
- private List<TaskAttemptCompletionEvent> completionEvents = null;
private JobInfo jobInfo;
-
+ private JobReport report;
+ AtomicBoolean tasksLoaded = new AtomicBoolean(false);
+ private Lock tasksLock = new ReentrantLock();
+ private Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+ private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
+ private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
+ private List<TaskAttemptCompletionEvent> completionEvents = null;
+ private JobACLsManager aclsMgr;
+
+
public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr)
throws IOException {
LOG.info("Loading job: " + jobId + " from file: " + historyFile);
this.conf = conf;
this.jobId = jobId;
+ this.user = userName;
this.confFile = confFile;
this.aclsMgr = aclsMgr;
-
loadFullHistoryData(loadTasks, historyFile);
- user = userName;
- counters = jobInfo.getTotalCounters();
- diagnostics.add(jobInfo.getErrorInfo());
- report =
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
- JobReport.class);
- report.setJobId(jobId);
- report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
- report.setSubmitTime(jobInfo.getSubmitTime());
- report.setStartTime(jobInfo.getLaunchTime());
- report.setFinishTime(jobInfo.getFinishTime());
- report.setJobName(jobInfo.getJobname());
- report.setUser(jobInfo.getUsername());
- report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
- report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
- report.setJobFile(confFile.toString());
- report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
- .toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
- report.setAMInfos(getAMInfos());
- report.setIsUber(isUber());
}
@Override
@@ -122,7 +107,7 @@ public class CompletedJob implements org
@Override
public Counters getAllCounters() {
- return counters;
+ return jobInfo.getTotalCounters();
}
@Override
@@ -131,10 +116,36 @@ public class CompletedJob implements org
}
@Override
- public JobReport getReport() {
+ public synchronized JobReport getReport() {
+ if (report == null) {
+ constructJobReport();
+ }
return report;
}
+ private void constructJobReport() {
+ report = Records.newRecord(JobReport.class);
+ report.setJobId(jobId);
+ report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
+ report.setSubmitTime(jobInfo.getSubmitTime());
+ report.setStartTime(jobInfo.getLaunchTime());
+ report.setFinishTime(jobInfo.getFinishTime());
+ report.setJobName(jobInfo.getJobname());
+ report.setUser(jobInfo.getUsername());
+ report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
+ report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
+ report.setJobFile(confFile.toString());
+ String historyUrl = "N/A";
+ try {
+ historyUrl = JobHistoryUtils.getHistoryUrl(conf, jobId.getAppId());
+ } catch (UnknownHostException e) {
+ //Ignore.
+ }
+ report.setTrackingUrl(historyUrl);
+ report.setAMInfos(getAMInfos());
+ report.setIsUber(isUber());
+ }
+
@Override
public float getProgress() {
return 1.0f;
@@ -142,16 +153,23 @@ public class CompletedJob implements org
@Override
public JobState getState() {
- return report.getJobState();
+ return JobState.valueOf(jobInfo.getJobStatus());
}
@Override
public Task getTask(TaskId taskId) {
- return tasks.get(taskId);
+ if (tasksLoaded.get()) {
+ return tasks.get(taskId);
+ } else {
+ TaskID oldTaskId = TypeConverter.fromYarn(taskId);
+ CompletedTask completedTask =
+ new CompletedTask(taskId, jobInfo.getAllTasks().get(oldTaskId));
+ return completedTask;
+ }
}
@Override
- public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+ public synchronized TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
if (completionEvents == null) {
constructTaskAttemptCompletionEvents();
@@ -167,6 +185,7 @@ public class CompletedJob implements org
}
private void constructTaskAttemptCompletionEvents() {
+ loadAllTasks();
completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
for (TaskId taskId : tasks.keySet()) {
@@ -205,8 +224,8 @@ public class CompletedJob implements org
int eventId = 0;
for (TaskAttempt taskAttempt : allTaskAttempts) {
- TaskAttemptCompletionEvent tace = RecordFactoryProvider.getRecordFactory(
- null).newRecordInstance(TaskAttemptCompletionEvent.class);
+ TaskAttemptCompletionEvent tace =
+ Records.newRecord(TaskAttemptCompletionEvent.class);
int attemptRunTime = -1;
if (taskAttempt.getLaunchTime() != 0 && taskAttempt.getFinishTime() != 0) {
@@ -237,15 +256,42 @@ public class CompletedJob implements org
@Override
public Map<TaskId, Task> getTasks() {
+ loadAllTasks();
return tasks;
}
+ private void loadAllTasks() {
+ if (tasksLoaded.get()) {
+ return;
+ }
+ tasksLock.lock();
+ try {
+ if (tasksLoaded.get()) {
+ return;
+ }
+ for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
+ TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
+ TaskInfo taskInfo = entry.getValue();
+ Task task = new CompletedTask(yarnTaskID, taskInfo);
+ tasks.put(yarnTaskID, task);
+ if (task.getType() == TaskType.MAP) {
+ mapTasks.put(task.getID(), task);
+ } else if (task.getType() == TaskType.REDUCE) {
+ reduceTasks.put(task.getID(), task);
+ }
+ }
+ tasksLoaded.set(true);
+ } finally {
+ tasksLock.unlock();
+ }
+ }
+
//History data is leisurely loaded when task level data is requested
private synchronized void loadFullHistoryData(boolean loadTasks,
Path historyFileAbsolute) throws IOException {
LOG.info("Loading history file: [" + historyFileAbsolute + "]");
- if (jobInfo != null) {
- return; //data already loaded
+ if (this.jobInfo != null) {
+ return;
}
if (historyFileAbsolute != null) {
@@ -254,7 +300,7 @@ public class CompletedJob implements org
parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute);
- jobInfo = parser.parse();
+ this.jobInfo = parser.parse();
} catch (IOException e) {
throw new YarnException("Could not load history file "
+ historyFileAbsolute, e);
@@ -268,27 +314,15 @@ public class CompletedJob implements org
} else {
throw new IOException("History file not found");
}
-
if (loadTasks) {
- for (Map.Entry<org.apache.hadoop.mapreduce.TaskID, TaskInfo> entry : jobInfo
- .getAllTasks().entrySet()) {
- TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
- TaskInfo taskInfo = entry.getValue();
- Task task = new CompletedTask(yarnTaskID, taskInfo);
- tasks.put(yarnTaskID, task);
- if (task.getType() == TaskType.MAP) {
- mapTasks.put(task.getID(), task);
- } else if (task.getType() == TaskType.REDUCE) {
- reduceTasks.put(task.getID(), task);
- }
- }
- }
- LOG.info("TaskInfo loaded");
+ loadAllTasks();
+ LOG.info("TaskInfo loaded");
+ }
}
@Override
public List<String> getDiagnostics() {
- return diagnostics;
+ return Collections.singletonList(jobInfo.getErrorInfo());
}
@Override
@@ -318,6 +352,7 @@ public class CompletedJob implements org
@Override
public Map<TaskId, Task> getTasks(TaskType taskType) {
+ loadAllTasks();
if (TaskType.MAP.equals(taskType)) {
return mapTasks;
} else {//we have only two types of tasks
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java Tue Feb 28 00:32:19 2012
@@ -20,10 +20,13 @@ package org.apache.hadoop.mapreduce.v2.h
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@@ -35,59 +38,24 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
public class CompletedTask implements Task {
-
- private final TaskType type;
- private Counters counters;
- private final long startTime;
- private final long finishTime;
- private TaskState state;
private final TaskId taskId;
- private final TaskReport report;
+ private final TaskInfo taskInfo;
+ private TaskReport report;
+ private TaskAttemptId successfulAttempt;
+ private List<String> reportDiagnostics = new LinkedList<String>();
+ private Lock taskAttemptsLock = new ReentrantLock();
+ private AtomicBoolean taskAttemptsLoaded = new AtomicBoolean(false);
private final Map<TaskAttemptId, TaskAttempt> attempts =
new LinkedHashMap<TaskAttemptId, TaskAttempt>();
-
- private static final Log LOG = LogFactory.getLog(CompletedTask.class);
CompletedTask(TaskId taskId, TaskInfo taskInfo) {
//TODO JobHistoryParser.handleTaskFailedAttempt should use state from the event.
- LOG.debug("HandlingTaskId: [" + taskId + "]");
+ this.taskInfo = taskInfo;
this.taskId = taskId;
- this.startTime = taskInfo.getStartTime();
- this.finishTime = taskInfo.getFinishTime();
- this.type = TypeConverter.toYarn(taskInfo.getTaskType());
- if (taskInfo.getCounters() != null)
- this.counters = taskInfo.getCounters();
- if (taskInfo.getTaskStatus() != null) {
- this.state = TaskState.valueOf(taskInfo.getTaskStatus());
- } else {
- this.state = TaskState.KILLED;
- }
- report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskReport.class);
- for (TaskAttemptInfo attemptHistory : taskInfo.getAllTaskAttempts()
- .values()) {
- CompletedTaskAttempt attempt = new CompletedTaskAttempt(taskId,
- attemptHistory);
- report.addAllDiagnostics(attempt.getDiagnostics()); //TODO TMI?
- attempts.put(attempt.getID(), attempt);
- if (attemptHistory.getTaskStatus() != null
- && attemptHistory.getTaskStatus().equals(
- TaskState.SUCCEEDED.toString())
- && report.getSuccessfulAttempt() == null) {
- report.setSuccessfulAttempt(TypeConverter.toYarn(attemptHistory
- .getAttemptId()));
- }
- }
- report.setTaskId(taskId);
- report.setStartTime(startTime);
- report.setFinishTime(finishTime);
- report.setTaskState(state);
- report.setProgress(getProgress());
- report.setCounters(TypeConverter.toYarn(getCounters()));
- report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
}
@Override
@@ -97,17 +65,19 @@ public class CompletedTask implements Ta
@Override
public TaskAttempt getAttempt(TaskAttemptId attemptID) {
+ loadAllTaskAttempts();
return attempts.get(attemptID);
}
@Override
public Map<TaskAttemptId, TaskAttempt> getAttempts() {
+ loadAllTaskAttempts();
return attempts;
}
@Override
public Counters getCounters() {
- return counters;
+ return taskInfo.getCounters();
}
@Override
@@ -121,13 +91,18 @@ public class CompletedTask implements Ta
}
@Override
- public TaskReport getReport() {
+ public synchronized TaskReport getReport() {
+ if (report == null) {
+ constructTaskReport();
+ }
return report;
}
+
+
@Override
public TaskType getType() {
- return type;
+ return TypeConverter.toYarn(taskInfo.getTaskType());
}
@Override
@@ -137,7 +112,54 @@ public class CompletedTask implements Ta
@Override
public TaskState getState() {
- return state;
+ return taskInfo.getTaskStatus() == null ? TaskState.KILLED : TaskState
+ .valueOf(taskInfo.getTaskStatus());
}
+ private void constructTaskReport() {
+ loadAllTaskAttempts();
+ this.report = Records.newRecord(TaskReport.class);
+ report.setTaskId(taskId);
+ report.setStartTime(taskInfo.getStartTime());
+ report.setFinishTime(taskInfo.getFinishTime());
+ report.setTaskState(getState());
+ report.setProgress(getProgress());
+ report.setCounters(TypeConverter.toYarn(getCounters()));
+ if (successfulAttempt != null) {
+ report.setSuccessfulAttempt(successfulAttempt);
+ }
+ report.addAllDiagnostics(reportDiagnostics);
+ report
+ .addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
+ }
+
+ private void loadAllTaskAttempts() {
+ if (taskAttemptsLoaded.get()) {
+ return;
+ }
+ taskAttemptsLock.lock();
+ try {
+ if (taskAttemptsLoaded.get()) {
+ return;
+ }
+
+ for (TaskAttemptInfo attemptHistory : taskInfo.getAllTaskAttempts()
+ .values()) {
+ CompletedTaskAttempt attempt =
+ new CompletedTaskAttempt(taskId, attemptHistory);
+ reportDiagnostics.addAll(attempt.getDiagnostics());
+ attempts.put(attempt.getID(), attempt);
+ if (successfulAttempt == null
+ && attemptHistory.getTaskStatus() != null
+ && attemptHistory.getTaskStatus().equals(
+ TaskState.SUCCEEDED.toString())) {
+ successfulAttempt =
+ TypeConverter.toYarn(attemptHistory.getAttemptId());
+ }
+ }
+ taskAttemptsLoaded.set(true);
+ } finally {
+ taskAttemptsLock.unlock();
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Tue Feb 28 00:32:19 2012
@@ -30,25 +30,21 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
public class CompletedTaskAttempt implements TaskAttempt {
private final TaskAttemptInfo attemptInfo;
private final TaskAttemptId attemptId;
- private Counters counters;
private final TaskAttemptState state;
- private final TaskAttemptReport report;
private final List<String> diagnostics = new ArrayList<String>();
+ private TaskAttemptReport report;
private String localDiagMessage;
CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) {
this.attemptInfo = attemptInfo;
this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
- if (attemptInfo.getCounters() != null) {
- this.counters = attemptInfo.getCounters();
- }
if (attemptInfo.getTaskStatus() != null) {
this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus());
} else {
@@ -56,37 +52,9 @@ public class CompletedTaskAttempt implem
localDiagMessage = "Attmpt state missing from History : marked as KILLED";
diagnostics.add(localDiagMessage);
}
-
if (attemptInfo.getError() != null) {
diagnostics.add(attemptInfo.getError());
}
-
- report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class);
-
- report.setTaskAttemptId(attemptId);
- report.setTaskAttemptState(state);
- report.setProgress(getProgress());
- report.setStartTime(attemptInfo.getStartTime());
-
- report.setFinishTime(attemptInfo.getFinishTime());
- report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
- report.setSortFinishTime(attemptInfo.getSortFinishTime());
- if (localDiagMessage != null) {
- report.setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
- } else {
- report.setDiagnosticInfo(attemptInfo.getError());
- }
-// report.setPhase(attemptInfo.get); //TODO
- report.setStateString(attemptInfo.getState());
- report.setCounters(TypeConverter.toYarn(getCounters()));
- report.setContainerId(attemptInfo.getContainerId());
- if (attemptInfo.getHostname() == null) {
- report.setNodeManagerHost("UNKNOWN");
- } else {
- report.setNodeManagerHost(attemptInfo.getHostname());
- report.setNodeManagerPort(attemptInfo.getPort());
- }
- report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
}
@Override
@@ -111,7 +79,7 @@ public class CompletedTaskAttempt implem
@Override
public Counters getCounters() {
- return counters;
+ return attemptInfo.getCounters();
}
@Override
@@ -125,7 +93,10 @@ public class CompletedTaskAttempt implem
}
@Override
- public TaskAttemptReport getReport() {
+ public synchronized TaskAttemptReport getReport() {
+ if (report == null) {
+ constructTaskAttemptReport();
+ }
return report;
}
@@ -146,26 +117,55 @@ public class CompletedTaskAttempt implem
@Override
public long getLaunchTime() {
- return report.getStartTime();
+ return attemptInfo.getStartTime();
}
@Override
public long getFinishTime() {
- return report.getFinishTime();
+ return attemptInfo.getFinishTime();
}
@Override
public long getShuffleFinishTime() {
- return report.getShuffleFinishTime();
+ return attemptInfo.getShuffleFinishTime();
}
@Override
public long getSortFinishTime() {
- return report.getSortFinishTime();
+ return attemptInfo.getSortFinishTime();
}
@Override
public int getShufflePort() {
- throw new UnsupportedOperationException("Not supported yet.");
+ return attemptInfo.getShufflePort();
+ }
+
+ private void constructTaskAttemptReport() {
+ report = Records.newRecord(TaskAttemptReport.class);
+
+ report.setTaskAttemptId(attemptId);
+ report.setTaskAttemptState(state);
+ report.setProgress(getProgress());
+ report.setStartTime(attemptInfo.getStartTime());
+ report.setFinishTime(attemptInfo.getFinishTime());
+ report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
+ report.setSortFinishTime(attemptInfo.getSortFinishTime());
+ if (localDiagMessage != null) {
+ report
+ .setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
+ } else {
+ report.setDiagnosticInfo(attemptInfo.getError());
+ }
+ // report.setPhase(attemptInfo.get); //TODO
+ report.setStateString(attemptInfo.getState());
+ report.setCounters(TypeConverter.toYarn(getCounters()));
+ report.setContainerId(attemptInfo.getContainerId());
+ if (attemptInfo.getHostname() == null) {
+ report.setNodeManagerHost("UNKNOWN");
+ } else {
+ report.setNodeManagerHost(attemptInfo.getHostname());
+ report.setNodeManagerPort(attemptInfo.getPort());
+ }
+ report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1294417&r1=1294416&r2=1294417&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue Feb 28 00:32:19 2012
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -117,9 +118,8 @@ public class JobHistory extends Abstract
//Maintains a list of known done subdirectories. Not currently used.
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
-
- private final SortedMap<JobId, Job> loadedJobCache =
- new ConcurrentSkipListMap<JobId, Job>();
+
+ private Map<JobId, Job> loadedJobCache = null;
/**
* Maintains a mapping between intermediate user directories and the last
@@ -167,6 +167,7 @@ public class JobHistory extends Abstract
* .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
*/
+ @SuppressWarnings("serial")
@Override
public void init(Configuration conf) throws YarnException {
LOG.info("JobHistory Init");
@@ -224,6 +225,16 @@ public class JobHistory extends Abstract
DEFAULT_MOVE_THREAD_INTERVAL);
numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
DEFAULT_MOVE_THREAD_COUNT);
+
+ loadedJobCache =
+ Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
+ loadedJobCacheSize + 1, 0.75f, true) {
+ @Override
+ public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
+ return super.size() > loadedJobCacheSize;
+ }
+ });
+
try {
initExisting();
} catch (IOException e) {
@@ -465,9 +476,6 @@ public class JobHistory extends Abstract
LOG.debug("Adding "+job.getID()+" to loaded job cache");
}
loadedJobCache.put(job.getID(), job);
- if (loadedJobCache.size() > loadedJobCacheSize ) {
- loadedJobCache.remove(loadedJobCache.firstKey());
- }
}
@@ -655,7 +663,7 @@ public class JobHistory extends Abstract
synchronized(metaInfo) {
try {
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
- metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
+ metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
metaInfo.getConfFile(), this.aclsMgr);
addToLoadedJobCache(job);
return job;
@@ -938,7 +946,7 @@ public class JobHistory extends Abstract
LOG.debug("Called getAllJobs()");
return getAllJobsInternal();
}
-
+
static class MetaInfo {
private Path historyFile;
private Path confFile;
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1294417&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Tue Feb 28 00:32:19 2012
@@ -0,0 +1,145 @@
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class TestJobHistoryEntities {
+
+ private final String historyFileName =
+ "job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist";
+ private final String confFileName = "job_1329348432655_0001_conf.xml";
+ private final Configuration conf = new Configuration();
+ private final JobACLsManager jobAclsManager = new JobACLsManager(conf);
+ private boolean loadTasks;
+ private JobId jobId = MRBuilderUtils.newJobId(1329348432655l, 1, 1);
+ Path fulleHistoryPath =
+ new Path(this.getClass().getClassLoader().getResource(historyFileName)
+ .getFile());
+ Path fullConfPath =
+ new Path(this.getClass().getClassLoader().getResource(confFileName)
+ .getFile());
+ private CompletedJob completedJob;
+
+ public TestJobHistoryEntities(boolean loadTasks) throws Exception {
+ this.loadTasks = loadTasks;
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> list = new ArrayList<Object[]>(2);
+ list.add(new Object[] { true });
+ list.add(new Object[] { false });
+ return list;
+ }
+
+ /* Verify some expected values based on the history file */
+ @Test
+ public void testCompletedJob() throws Exception {
+ //Re-initialize to verify the delayed load.
+ completedJob =
+ new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
+ fullConfPath, jobAclsManager);
+ //Verify tasks loaded based on loadTask parameter.
+ assertEquals(loadTasks, completedJob.tasksLoaded.get());
+ assertEquals(1, completedJob.getAMInfos().size());
+ assertEquals(10, completedJob.getCompletedMaps());
+ assertEquals(1, completedJob.getCompletedReduces());
+ assertEquals(11, completedJob.getTasks().size());
+ //Verify tasks loaded at this point.
+ assertEquals(true, completedJob.tasksLoaded.get());
+ assertEquals(10, completedJob.getTasks(TaskType.MAP).size());
+ assertEquals(1, completedJob.getTasks(TaskType.REDUCE).size());
+ assertEquals("user", completedJob.getUserName());
+ assertEquals(JobState.SUCCEEDED, completedJob.getState());
+ JobReport jobReport = completedJob.getReport();
+ assertEquals("user", jobReport.getUser());
+ assertEquals(JobState.SUCCEEDED, jobReport.getJobState());
+ }
+
+ @Test
+ public void testCompletedTask() throws Exception {
+ completedJob =
+ new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
+ fullConfPath, jobAclsManager);
+ TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+
+ Map<TaskId, Task> mapTasks = completedJob.getTasks(TaskType.MAP);
+ Map<TaskId, Task> reduceTasks = completedJob.getTasks(TaskType.REDUCE);
+ assertEquals(10, mapTasks.size());
+ assertEquals(1, reduceTasks.size());
+
+ Task mt1 = mapTasks.get(mt1Id);
+ assertEquals(1, mt1.getAttempts().size());
+ assertEquals(TaskState.SUCCEEDED, mt1.getState());
+ TaskReport mt1Report = mt1.getReport();
+ assertEquals(TaskState.SUCCEEDED, mt1Report.getTaskState());
+ assertEquals(mt1Id, mt1Report.getTaskId());
+ Task rt1 = reduceTasks.get(rt1Id);
+ assertEquals(1, rt1.getAttempts().size());
+ assertEquals(TaskState.SUCCEEDED, rt1.getState());
+ TaskReport rt1Report = rt1.getReport();
+ assertEquals(TaskState.SUCCEEDED, rt1Report.getTaskState());
+ assertEquals(rt1Id, rt1Report.getTaskId());
+ }
+
+ @Test
+ public void testCompletedTaskAttempt() throws Exception {
+ completedJob =
+ new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
+ fullConfPath, jobAclsManager);
+ TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+ TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);
+ TaskAttemptId rta1Id = MRBuilderUtils.newTaskAttemptId(rt1Id, 0);
+
+ Task mt1 = completedJob.getTask(mt1Id);
+ Task rt1 = completedJob.getTask(rt1Id);
+
+ TaskAttempt mta1 = mt1.getAttempt(mta1Id);
+ assertEquals(TaskAttemptState.SUCCEEDED, mta1.getState());
+ assertEquals("localhost:45454", mta1.getAssignedContainerMgrAddress());
+ assertEquals("localhost:9999", mta1.getNodeHttpAddress());
+ TaskAttemptReport mta1Report = mta1.getReport();
+ assertEquals(TaskAttemptState.SUCCEEDED, mta1Report.getTaskAttemptState());
+ assertEquals("localhost", mta1Report.getNodeManagerHost());
+ assertEquals(45454, mta1Report.getNodeManagerPort());
+ assertEquals(9999, mta1Report.getNodeManagerHttpPort());
+
+ TaskAttempt rta1 = rt1.getAttempt(rta1Id);
+ assertEquals(TaskAttemptState.SUCCEEDED, rta1.getState());
+ assertEquals("localhost:45454", rta1.getAssignedContainerMgrAddress());
+ assertEquals("localhost:9999", rta1.getNodeHttpAddress());
+ TaskAttemptReport rta1Report = rta1.getReport();
+ assertEquals(TaskAttemptState.SUCCEEDED, rta1Report.getTaskAttemptState());
+ assertEquals("localhost", rta1Report.getNodeManagerHost());
+ assertEquals(45454, rta1Report.getNodeManagerPort());
+ assertEquals(9999, rta1Report.getNodeManagerHttpPort());
+ }
+}