You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/09/16 01:07:52 UTC
svn commit: r1171315 [2/4] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-a...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Thu Sep 15 23:07:48 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.h
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -36,8 +35,6 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,18 +84,18 @@ public class JobHistory extends Abstract
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
- private static final Pattern DATE_PATTERN = Pattern
- .compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])");
-
/*
* TODO Get rid of this once JobId has it's own comparator
*/
- private static final Comparator<JobId> JOB_ID_COMPARATOR = new Comparator<JobId>() {
+ private static final Comparator<JobId> JOB_ID_COMPARATOR =
+ new Comparator<JobId>() {
@Override
public int compare(JobId o1, JobId o2) {
- if (o1.getAppId().getClusterTimestamp() > o2.getAppId().getClusterTimestamp()) {
+ if (o1.getAppId().getClusterTimestamp() >
+ o2.getAppId().getClusterTimestamp()) {
return 1;
- } else if (o1.getAppId().getClusterTimestamp() < o2.getAppId().getClusterTimestamp()) {
+ } else if (o1.getAppId().getClusterTimestamp() <
+ o2.getAppId().getClusterTimestamp()) {
return -1;
} else {
return o1.getId() - o2.getId();
@@ -106,7 +103,8 @@ public class JobHistory extends Abstract
}
};
- private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils.doneSubdirsBeforeSerialTail();
+ private static String DONE_BEFORE_SERIAL_TAIL =
+ JobHistoryUtils.doneSubdirsBeforeSerialTail();
/**
* Maps between a serial number (generated based on jobId) and the timestamp
@@ -114,29 +112,32 @@ public class JobHistory extends Abstract
* Facilitates jobId based searches.
* If a jobId is not found in this list - it will not be found.
*/
- private final SortedMap<String, Set<String>> idToDateString = new ConcurrentSkipListMap<String, Set<String>>();
+ private final SortedMap<String, Set<String>> idToDateString =
+ new ConcurrentSkipListMap<String, Set<String>>();
//Maintains minimal details for recent jobs (parsed from history file name).
//Sorted on Job Completion Time.
- private final SortedMap<JobId, MetaInfo> jobListCache = new ConcurrentSkipListMap<JobId, MetaInfo>(
- JOB_ID_COMPARATOR);
+ private final SortedMap<JobId, MetaInfo> jobListCache =
+ new ConcurrentSkipListMap<JobId, MetaInfo>(JOB_ID_COMPARATOR);
// Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
// Check for existance of the object when using iterators.
- private final SortedMap<JobId, MetaInfo> intermediateListCache = new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(
- JOB_ID_COMPARATOR);
+ private final SortedMap<JobId, MetaInfo> intermediateListCache =
+ new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(JOB_ID_COMPARATOR);
//Maintains a list of known done subdirectories. Not currently used.
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
- private final SortedMap<JobId, Job> loadedJobCache = new ConcurrentSkipListMap<JobId, Job>(
- JOB_ID_COMPARATOR);
+ private final SortedMap<JobId, Job> loadedJobCache =
+ new ConcurrentSkipListMap<JobId, Job>(JOB_ID_COMPARATOR);
/**
- * Maintains a mapping between intermediate user directories and the last known modification time.
+ * Maintains a mapping between intermediate user directories and the last
+ * known modification time.
*/
- private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
+ private Map<String, Long> userDirModificationTimeMap =
+ new HashMap<String, Long>();
//The number of jobs to maintain in the job list cache.
private int jobListCacheSize;
@@ -187,7 +188,8 @@ public class JobHistory extends Abstract
debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
serialNumberLowDigits = debugMode ? 1 : 3;
serialNumberFormat = ("%0"
- + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d");
+ + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS
+ + serialNumberLowDigits) + "d");
String doneDirPrefix = null;
doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
@@ -195,9 +197,11 @@ public class JobHistory extends Abstract
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
new Path(doneDirPrefix));
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
- mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
+ mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
+ JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
} catch (IOException e) {
- throw new YarnException("Error creating done directory: [" + doneDirPrefixPath + "]", e);
+ throw new YarnException("Error creating done directory: [" +
+ doneDirPrefixPath + "]", e);
}
String intermediateDoneDirPrefix = null;
@@ -208,21 +212,27 @@ public class JobHistory extends Abstract
.makeQualified(new Path(intermediateDoneDirPrefix));
intermediateDoneDirFc = FileContext.getFileContext(
intermediateDoneDirPath.toUri(), conf);
- mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
+ mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
+ JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
} catch (IOException e) {
LOG.info("error creating done directory on dfs " + e);
- throw new YarnException("Error creating intermediate done directory: [" + intermediateDoneDirPath + "]", e);
+ throw new YarnException("Error creating intermediate done directory: ["
+ + intermediateDoneDirPath + "]", e);
}
- jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, DEFAULT_JOBLIST_CACHE_SIZE);
- loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, DEFAULT_LOADEDJOB_CACHE_SIZE);
- dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, DEFAULT_DATESTRING_CACHE_SIZE);
+ jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+ DEFAULT_JOBLIST_CACHE_SIZE);
+ loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
+ DEFAULT_LOADEDJOB_CACHE_SIZE);
+ dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
+ DEFAULT_DATESTRING_CACHE_SIZE);
moveThreadInterval =
conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
DEFAULT_MOVE_THREAD_INTERVAL);
- numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, DEFAULT_MOVE_THREAD_COUNT);
+ numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
+ DEFAULT_MOVE_THREAD_COUNT);
try {
initExisting();
} catch (IOException e) {
@@ -254,19 +264,21 @@ public class JobHistory extends Abstract
@Override
public void start() {
//Start moveIntermediatToDoneThread
- moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
+ moveIntermediateToDoneRunnable =
+ new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
moveIntermediateToDoneThread.start();
//Start historyCleaner
- boolean startCleanerService = conf.getBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
+ boolean startCleanerService = conf.getBoolean(
+ JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
if (startCleanerService) {
- long maxAgeOfHistoryFiles = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
- DEFAULT_HISTORY_MAX_AGE);
+ long maxAgeOfHistoryFiles = conf.getLong(
+ JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
- long runInterval = conf.getLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
- DEFAULT_RUN_INTERVAL);
+ long runInterval = conf.getLong(
+ JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
cleanerScheduledExecutor
.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
@@ -331,13 +343,16 @@ public class JobHistory extends Abstract
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
String serialPart = serialDirPath.getName();
- String timeStampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
+ String timeStampPart =
+ JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
if (timeStampPart == null) {
- LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
+ LOG.warn("Could not find timestamp portion from path: " +
+ serialDirPath.toString() +". Continuing with next");
return;
}
if (serialPart == null) {
- LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
+ LOG.warn("Could not find serial portion from path: " +
+ serialDirPath.toString() + ". Continuing with next");
return;
}
if (idToDateString.containsKey(serialPart)) {
@@ -355,13 +370,16 @@ public class JobHistory extends Abstract
LOG.debug("Adding "+serialDirPath+" to serial index");
}
String serialPart = serialDirPath.getName();
- String timestampPart = JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
+ String timestampPart =
+ JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
if (timestampPart == null) {
- LOG.warn("Could not find timestamp portion from path: " + serialDirPath.toString() +". Continuing with next");
+ LOG.warn("Could not find timestamp portion from path: " +
+ serialDirPath.toString() +". Continuing with next");
return;
}
if (serialPart == null) {
- LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next");
+ LOG.warn("Could not find serial portion from path: " +
+ serialDirPath.toString() + ". Continuing with next");
}
addToSerialNumberIndex(serialPart, timestampPart);
}
@@ -400,7 +418,8 @@ public class JobHistory extends Abstract
}
}
- private static List<FileStatus> scanDirectory(Path path, FileContext fc, PathFilter pathFilter) throws IOException {
+ private static List<FileStatus> scanDirectory(Path path, FileContext fc,
+ PathFilter pathFilter) throws IOException {
path = fc.makeQualified(path);
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
@@ -414,7 +433,8 @@ public class JobHistory extends Abstract
return jhStatusList;
}
- private static List<FileStatus> scanDirectoryForHistoryFiles(Path path, FileContext fc) throws IOException {
+ private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
+ FileContext fc) throws IOException {
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
}
@@ -425,7 +445,8 @@ public class JobHistory extends Abstract
* @return
*/
private List<FileStatus> findTimestampedDirectories() throws IOException {
- List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
+ List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
+ doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
return fsList;
}
@@ -434,7 +455,8 @@ public class JobHistory extends Abstract
*/
private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+jobId+" to job list cache with "+metaInfo.getJobIndexInfo());
+ LOG.debug("Adding "+jobId+" to job list cache with "
+ +metaInfo.getJobIndexInfo());
}
jobListCache.put(jobId, metaInfo);
if (jobListCache.size() > jobListCacheSize) {
@@ -462,14 +484,16 @@ public class JobHistory extends Abstract
* @throws IOException
*/
private void scanIntermediateDirectory() throws IOException {
- List<FileStatus> userDirList = JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
+ List<FileStatus> userDirList =
+ JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
for (FileStatus userDir : userDirList) {
String name = userDir.getPath().getName();
long newModificationTime = userDir.getModificationTime();
boolean shouldScan = false;
synchronized (userDirModificationTimeMap) {
- if (!userDirModificationTimeMap.containsKey(name) || newModificationTime > userDirModificationTimeMap.get(name)) {
+ if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
+ > userDirModificationTimeMap.get(name)) {
shouldScan = true;
userDirModificationTimeMap.put(name, newModificationTime);
}
@@ -514,9 +538,11 @@ public class JobHistory extends Abstract
* @return A MetaInfo object for the jobId, null if not found.
* @throws IOException
*/
- private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException {
+ private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
+ throws IOException {
for (FileStatus fs : fileStatusList) {
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
+ JobIndexInfo jobIndexInfo =
+ FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
if (jobIndexInfo.getJobId().equals(jobId)) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
@@ -549,7 +575,8 @@ public class JobHistory extends Abstract
}
for (String timestampPart : dateStringSet) {
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc);
+ List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
+ doneDirFc);
MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
if (metaInfo != null) {
return metaInfo;
@@ -559,7 +586,8 @@ public class JobHistory extends Abstract
}
/**
- * Checks for the existence of the job history file in the interemediate directory.
+ * Checks for the existence of the job history file in the intermediate
+ * directory.
* @param jobId
* @return
* @throws IOException
@@ -586,7 +614,8 @@ public class JobHistory extends Abstract
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
this.sleepTime = sleepTime;
- moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+ moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
running = true;
}
@@ -604,7 +633,8 @@ public class JobHistory extends Abstract
try {
moveToDone(metaInfo);
} catch (IOException e) {
- LOG.info("Failed to process metaInfo for job: " + metaInfo.jobIndexInfo.getJobId(), e);
+ LOG.info("Failed to process metaInfo for job: " +
+ metaInfo.jobIndexInfo.getJobId(), e);
}
}
});
@@ -629,38 +659,17 @@ public class JobHistory extends Abstract
synchronized(metaInfo) {
try {
Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
- metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser());
+ metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
+ metaInfo.getConfFile());
addToLoadedJobCache(job);
return job;
} catch (IOException e) {
- throw new YarnException("Could not find/load job: " + metaInfo.getJobIndexInfo().getJobId(), e);
+ throw new YarnException("Could not find/load job: " +
+ metaInfo.getJobIndexInfo().getJobId(), e);
}
}
}
- private SortedMap<JobId, JobIndexInfo> getAllJobsMetaInfo() {
- SortedMap<JobId, JobIndexInfo> result = new TreeMap<JobId, JobIndexInfo>(JOB_ID_COMPARATOR);
- try {
- scanIntermediateDirectory();
- } catch (IOException e) {
- LOG.warn("Failed to scan intermediate directory", e);
- throw new YarnException(e);
- }
- for (JobId jobId : intermediateListCache.keySet()) {
- MetaInfo mi = intermediateListCache.get(jobId);
- if (mi != null) {
- result.put(jobId, mi.getJobIndexInfo());
- }
- }
- for (JobId jobId : jobListCache.keySet()) {
- MetaInfo mi = jobListCache.get(jobId);
- if (mi != null) {
- result.put(jobId, mi.getJobIndexInfo());
- }
- }
- return result;
- }
-
private Map<JobId, Job> getAllJobsInternal() {
//TODO This should ideally be using getAllJobsMetaInfo
// or get rid of that method once Job has APIs for user, finishTime etc.
@@ -746,108 +755,6 @@ public class JobHistory extends Abstract
return null;
}
- /**
- * Searches cached jobs for the specified criteria (AND). Ignores the criteria if null.
- * @param soughtUser
- * @param soughtJobNameSubstring
- * @param soughtDateStrings
- * @return
- */
- private Map<JobId, Job> findJobs(String soughtUser, String soughtJobNameSubstring, String[] soughtDateStrings) {
- boolean searchUser = true;
- boolean searchJobName = true;
- boolean searchDates = true;
- List<Calendar> soughtCalendars = null;
-
- if (soughtUser == null) {
- searchUser = false;
- }
- if (soughtJobNameSubstring == null) {
- searchJobName = false;
- }
- if (soughtDateStrings == null) {
- searchDates = false;
- } else {
- soughtCalendars = getSoughtDateAsCalendar(soughtDateStrings);
- }
-
- Map<JobId, Job> resultMap = new TreeMap<JobId, Job>();
-
- SortedMap<JobId, JobIndexInfo> allJobs = getAllJobsMetaInfo();
- for (Map.Entry<JobId, JobIndexInfo> entry : allJobs.entrySet()) {
- JobId jobId = entry.getKey();
- JobIndexInfo indexInfo = entry.getValue();
- String jobName = indexInfo.getJobName();
- String jobUser = indexInfo.getUser();
- long finishTime = indexInfo.getFinishTime();
-
- if (searchUser) {
- if (!soughtUser.equals(jobUser)) {
- continue;
- }
- }
-
- if (searchJobName) {
- if (!jobName.contains(soughtJobNameSubstring)) {
- continue;
- }
- }
-
- if (searchDates) {
- boolean matchedDate = false;
- Calendar jobCal = Calendar.getInstance();
- jobCal.setTimeInMillis(finishTime);
- for (Calendar cal : soughtCalendars) {
- if (jobCal.get(Calendar.YEAR) == cal.get(Calendar.YEAR) &&
- jobCal.get(Calendar.MONTH) == cal.get(Calendar.MONTH) &&
- jobCal.get(Calendar.DAY_OF_MONTH) == cal.get(Calendar.DAY_OF_MONTH)) {
- matchedDate = true;
- break;
- }
- }
- if (!matchedDate) {
- break;
- }
- }
- resultMap.put(jobId, new PartialJob(indexInfo, jobId));
- }
- return resultMap;
- }
-
- private List<Calendar> getSoughtDateAsCalendar(String [] soughtDateStrings) {
- List<Calendar> soughtCalendars = new ArrayList<Calendar>();
- for (int i = 0 ; i < soughtDateStrings.length ; i++) {
- String soughtDate = soughtDateStrings[i];
- if (soughtDate.length() != 0) {
- Matcher m = DATE_PATTERN.matcher(soughtDate);
- if (m.matches()) {
- String yyyyPart = m.group(3);
- String mmPart = m.group(1);
- String ddPart = m.group(2);
-
- if (yyyyPart.length() == 2) {
- yyyyPart = "20" + yyyyPart;
- }
- if (mmPart.length() == 1) {
- mmPart = "0" + mmPart;
- }
- if (ddPart.length() == 1) {
- ddPart = "0" + ddPart;
- }
- Calendar soughtCal = Calendar.getInstance();
- soughtCal.set(Calendar.YEAR, Integer.parseInt(yyyyPart));
- soughtCal.set(Calendar.MONTH, Integer.parseInt(mmPart) - 1);
- soughtCal.set(Calendar.DAY_OF_MONTH, Integer.parseInt(ddPart) -1);
- soughtCalendars.add(soughtCal);
- }
- }
- }
- return soughtCalendars;
- }
-
-
-
-
private void moveToDone(MetaInfo metaInfo) throws IOException {
long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
if (completeTime == 0) completeTime = System.currentTimeMillis();
@@ -890,26 +797,31 @@ public class JobHistory extends Abstract
try {
maybeMakeSubdirectory(targetDir);
} catch (IOException e) {
- LOG.warn("Failed creating subdirectory: " + targetDir + " while attempting to move files for jobId: " + jobId);
+ LOG.warn("Failed creating subdirectory: " + targetDir +
+ " while attempting to move files for jobId: " + jobId);
throw e;
}
synchronized (metaInfo) {
if (historyFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile.getName()));
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir,
+ historyFile.getName()));
try {
moveToDoneNow(historyFile, toPath);
} catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
+ LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+ + jobId);
throw e;
}
metaInfo.setHistoryFile(toPath);
}
if (confFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile.getName()));
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir,
+ confFile.getName()));
try {
moveToDoneNow(confFile, toPath);
} catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
+ LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+ + jobId);
throw e;
}
metaInfo.setConfFile(toPath);
@@ -953,7 +865,8 @@ public class JobHistory extends Abstract
}
} catch (FileNotFoundException fnfE) {
try {
- FsPermission fsp = new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
+ FsPermission fsp =
+ new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
doneDirFc.mkdir(path, fsp, true);
FileStatus fsStatus = doneDirFc.getFileStatus(path);
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
@@ -972,12 +885,15 @@ public class JobHistory extends Abstract
}
private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
- return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
+ return new Path(doneDirPrefixPath,
+ JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
}
private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
- String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
- return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
+ String timestampComponent =
+ JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
+ return new Path(doneDirPrefixPath,
+ JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
}
@@ -1033,12 +949,13 @@ public class JobHistory extends Abstract
private Path summaryFile;
JobIndexInfo jobIndexInfo;
- MetaInfo(Path historyFile, Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo) {
+ MetaInfo(Path historyFile, Path confFile, Path summaryFile,
+ JobIndexInfo jobIndexInfo) {
this.historyFile = historyFile;
this.confFile = confFile;
this.summaryFile = summaryFile;
this.jobIndexInfo = jobIndexInfo;
- }
+ }
Path getHistoryFile() { return historyFile; }
Path getConfFile() { return confFile; }
@@ -1073,13 +990,19 @@ public class JobHistory extends Abstract
//Sort in ascending order. Relies on YYYY/MM/DD/Serial
Collections.sort(serialDirList);
for (FileStatus serialDir : serialDirList) {
- List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
+ List<FileStatus> historyFileList =
+ scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
for (FileStatus historyFile : historyFileList) {
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
- long effectiveTimestamp = getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
+ JobIndexInfo jobIndexInfo =
+ FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
+ long effectiveTimestamp =
+ getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
if (shouldDelete(effectiveTimestamp)) {
- String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(historyFile.getPath().getParent(), confFileName), null, jobIndexInfo);
+ String confFileName =
+ JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
+ MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
+ new Path(historyFile.getPath().getParent(), confFileName),
+ null, jobIndexInfo);
delete(metaInfo);
} else {
halted = true;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Thu Sep 15 23:07:48 2011
@@ -74,14 +74,15 @@ public class JobHistoryServer extends Co
public static void main(String[] args) {
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
- JobHistoryServer server = null;
try {
- server = new JobHistoryServer();
+ JobHistoryServer jobHistoryServer = new JobHistoryServer();
+ Runtime.getRuntime().addShutdownHook(
+ new CompositeServiceShutdownHook(jobHistoryServer));
YarnConfiguration conf = new YarnConfiguration(new JobConf());
- server.init(conf);
- server.start();
- } catch (Throwable e) {
- LOG.fatal(StringUtils.stringifyException(e));
+ jobHistoryServer.init(conf);
+ jobHistoryServer.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting JobHistoryServer", t);
System.exit(-1);
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Thu Sep 15 23:07:48 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import clover.org.apache.log4j.Logger;
@@ -147,4 +149,14 @@ public class PartialJob implements org.a
return jobIndexInfo.getUser();
}
+ @Override
+ public Path getConfFile() {
+ throw new IllegalStateException("Not implemented yet");
+ }
+
+ @Override
+ public Map<JobACL, AccessControlList> getJobACLs() {
+ throw new IllegalStateException("Not implemented yet");
+ }
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsController.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsController.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsController.java Thu Sep 15 23:07:48 2011
@@ -78,7 +78,16 @@ public class HsController extends AppCon
protected Class<? extends View> taskPage() {
return HsTaskPage.class;
}
-
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.v2.app.webapp.AppController#attemptsPage()
+ */
+ @Override
+ protected Class<? extends View> attemptsPage() {
+ return HsAttemptsPage.class;
+ }
+
// Need all of these methods here also as Guice doesn't look into parent
// classes.
@@ -128,6 +137,21 @@ public class HsController extends AppCon
}
/**
+ * @return the page that will be used to render the /conf page
+ */
+ protected Class<? extends View> confPage() {
+ return HsConfPage.class;
+ }
+
+ /**
+ * Render the /conf page
+ */
+ public void conf() {
+ requireJob();
+ render(confPage());
+ }
+
+ /**
* @return the page about the current server.
*/
protected Class<? extends View> aboutPage() {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java Thu Sep 15 23:07:48 2011
@@ -20,11 +20,14 @@ package org.apache.hadoop.mapreduce.v2.h
import com.google.inject.Inject;
import java.util.Date;
+import java.util.List;
import java.util.Map;
+import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -32,12 +35,13 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.apache.hadoop.yarn.webapp.view.InfoBlock;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
-import static org.apache.hadoop.yarn.util.StringHelper.*;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
/**
@@ -46,21 +50,18 @@ import static org.apache.hadoop.yarn.web
public class HsJobBlock extends HtmlBlock {
final AppContext appContext;
- int runningMapTasks = 0;
- int pendingMapTasks = 0;
- int runningReduceTasks = 0;
- int pendingReduceTasks = 0;
-
- int newMapAttempts = 0;
- int runningMapAttempts = 0;
int killedMapAttempts = 0;
int failedMapAttempts = 0;
int successfulMapAttempts = 0;
- int newReduceAttempts = 0;
- int runningReduceAttempts = 0;
int killedReduceAttempts = 0;
int failedReduceAttempts = 0;
int successfulReduceAttempts = 0;
+ long avgMapTime = 0;
+ long avgReduceTime = 0;
+ long avgShuffleTime = 0;
+ long avgSortTime = 0;
+ int numMaps;
+ int numReduces;
@Inject HsJobBlock(AppContext appctx) {
appContext = appctx;
@@ -84,9 +85,9 @@ public class HsJobBlock extends HtmlBloc
p()._("Sorry, ", jid, " not found.")._();
return;
}
+ Map<JobACL, AccessControlList> acls = job.getJobACLs();
+
JobReport jobReport = job.getReport();
- String mapPct = percent(jobReport.getMapProgress());
- String reducePct = percent(jobReport.getReduceProgress());
int mapTasks = job.getTotalMaps();
int mapTasksComplete = job.getCompletedMaps();
int reduceTasks = job.getTotalReduces();
@@ -94,13 +95,38 @@ public class HsJobBlock extends HtmlBloc
long startTime = jobReport.getStartTime();
long finishTime = jobReport.getFinishTime();
countTasksAndAttempts(job);
- info("Job Overview").
+ ResponseInfo infoBlock = info("Job Overview").
_("Job Name:", job.getName()).
+ _("User Name:", job.getUserName()).
_("State:", job.getState()).
_("Uberized:", job.isUber()).
_("Started:", new Date(startTime)).
+ _("Finished:", new Date(finishTime)).
_("Elapsed:", StringUtils.formatTime(
- Times.elapsed(startTime, finishTime)));
+ Times.elapsed(startTime, finishTime, false)));
+
+ List<String> diagnostics = job.getDiagnostics();
+ if(diagnostics != null && !diagnostics.isEmpty()) {
+ StringBuffer b = new StringBuffer();
+ for(String diag: diagnostics) {
+ b.append(diag);
+ }
+ infoBlock._("Diagnostics:", b.toString());
+ }
+
+ if(numMaps > 0) {
+ infoBlock._("Average Map Time", StringUtils.formatTime(avgMapTime));
+ }
+ if(numReduces > 0) {
+ infoBlock._("Average Reduce Time", StringUtils.formatTime(avgReduceTime));
+ infoBlock._("Average Shuffle Time", StringUtils.formatTime(avgShuffleTime));
+ infoBlock._("Average Merge Time", StringUtils.formatTime(avgSortTime));
+ }
+
+ for(Map.Entry<JobACL, AccessControlList> entry : acls.entrySet()) {
+ infoBlock._("ACL "+entry.getKey().getAclName()+":",
+ entry.getValue().getAclString());
+ }
html.
_(InfoBlock.class).
div(_INFO_WRAP).
@@ -109,34 +135,17 @@ public class HsJobBlock extends HtmlBloc
table("#job").
tr().
th(_TH, "Task Type").
- th(_TH, "Progress").
th(_TH, "Total").
- th(_TH, "Pending").
- th(_TH, "Running").
th(_TH, "Complete")._().
tr(_ODD).
th().
a(url("tasks", jid, "m"), "Map")._().
- td().
- div(_PROGRESSBAR).
- $title(join(mapPct, '%')). // tooltip
- div(_PROGRESSBAR_VALUE).
- $style(join("width:", mapPct, '%'))._()._()._().
td(String.valueOf(mapTasks)).
- td(String.valueOf(pendingMapTasks)).
- td(String.valueOf(runningMapTasks)).
td(String.valueOf(mapTasksComplete))._().
tr(_EVEN).
th().
a(url("tasks", jid, "r"), "Reduce")._().
- td().
- div(_PROGRESSBAR).
- $title(join(reducePct, '%')). // tooltip
- div(_PROGRESSBAR_VALUE).
- $style(join("width:", reducePct, '%'))._()._()._().
td(String.valueOf(reduceTasks)).
- td(String.valueOf(pendingReduceTasks)).
- td(String.valueOf(runningReduceTasks)).
td(String.valueOf(reducesTasksComplete))._()
._().
@@ -144,20 +153,12 @@ public class HsJobBlock extends HtmlBloc
table("#job").
tr().
th(_TH, "Attempt Type").
- th(_TH, "New").
- th(_TH, "Running").
th(_TH, "Failed").
th(_TH, "Killed").
th(_TH, "Successful")._().
tr(_ODD).
th("Maps").
td().a(url("attempts", jid, "m",
- TaskAttemptStateUI.NEW.toString()),
- String.valueOf(newMapAttempts))._().
- td().a(url("attempts", jid, "m",
- TaskAttemptStateUI.RUNNING.toString()),
- String.valueOf(runningMapAttempts))._().
- td().a(url("attempts", jid, "m",
TaskAttemptStateUI.FAILED.toString()),
String.valueOf(failedMapAttempts))._().
td().a(url("attempts", jid, "m",
@@ -170,12 +171,6 @@ public class HsJobBlock extends HtmlBloc
tr(_EVEN).
th("Reduces").
td().a(url("attempts", jid, "r",
- TaskAttemptStateUI.NEW.toString()),
- String.valueOf(newReduceAttempts))._().
- td().a(url("attempts", jid, "r",
- TaskAttemptStateUI.RUNNING.toString()),
- String.valueOf(runningReduceAttempts))._().
- td().a(url("attempts", jid, "r",
TaskAttemptStateUI.FAILED.toString()),
String.valueOf(failedReduceAttempts))._().
td().a(url("attempts", jid, "r",
@@ -195,44 +190,21 @@ public class HsJobBlock extends HtmlBloc
* @param job the job to get counts for.
*/
private void countTasksAndAttempts(Job job) {
+ numReduces = 0;
+ numMaps = 0;
Map<TaskId, Task> tasks = job.getTasks();
for (Task task : tasks.values()) {
- switch (task.getType()) {
- case MAP:
- // Task counts
- switch (task.getState()) {
- case RUNNING:
- ++runningMapTasks;
- break;
- case SCHEDULED:
- ++pendingMapTasks;
- break;
- }
- break;
- case REDUCE:
- // Task counts
- switch (task.getState()) {
- case RUNNING:
- ++runningReduceTasks;
- break;
- case SCHEDULED:
- ++pendingReduceTasks;
- break;
- }
- break;
- }
-
// Attempts counts
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
for (TaskAttempt attempt : attempts.values()) {
- int newAttempts = 0, running = 0, successful = 0, failed = 0, killed =0;
+ int successful = 0, failed = 0, killed =0;
if (TaskAttemptStateUI.NEW.correspondsTo(attempt.getState())) {
- ++newAttempts;
+ //Do Nothing
} else if (TaskAttemptStateUI.RUNNING.correspondsTo(attempt
.getState())) {
- ++running;
+ //Do Nothing
} else if (TaskAttemptStateUI.SUCCESSFUL.correspondsTo(attempt
.getState())) {
++successful;
@@ -246,21 +218,41 @@ public class HsJobBlock extends HtmlBloc
switch (task.getType()) {
case MAP:
- newMapAttempts += newAttempts;
- runningMapAttempts += running;
successfulMapAttempts += successful;
failedMapAttempts += failed;
killedMapAttempts += killed;
+ if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
+ numMaps++;
+ avgMapTime += (attempt.getFinishTime() -
+ attempt.getLaunchTime());
+ }
break;
case REDUCE:
- newReduceAttempts += newAttempts;
- runningReduceAttempts += running;
successfulReduceAttempts += successful;
failedReduceAttempts += failed;
killedReduceAttempts += killed;
+ if(attempt.getState() == TaskAttemptState.SUCCEEDED) {
+ numReduces++;
+ avgShuffleTime += (attempt.getShuffleFinishTime() -
+ attempt.getLaunchTime());
+ avgSortTime += attempt.getSortFinishTime() -
+ attempt.getLaunchTime();
+ avgReduceTime += (attempt.getFinishTime() -
+ attempt.getShuffleFinishTime());
+ }
break;
}
}
}
+
+ if(numMaps > 0) {
+ avgMapTime = avgMapTime / numMaps;
+ }
+
+ if(numReduces > 0) {
+ avgReduceTime = avgReduceTime / numReduces;
+ avgShuffleTime = avgShuffleTime / numReduces;
+ avgSortTime = avgSortTime / numReduces;
+ }
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsNavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsNavBlock.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsNavBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsNavBlock.java Thu Sep 15 23:07:48 2011
@@ -52,6 +52,7 @@ public class HsNavBlock extends HtmlBloc
ul().
li().a(url("job", jobid), "Overview")._().
li().a(url("jobcounters", jobid), "Counters")._().
+ li().a(url("conf", jobid), "Configuration")._().
li().a(url("tasks", jobid, "m"), "Map tasks")._().
li().a(url("tasks", jobid, "r"), "Reduce tasks")._()._();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java Thu Sep 15 23:07:48 2011
@@ -18,28 +18,32 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
-import static org.apache.hadoop.yarn.util.StringHelper.percent;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TFOOT;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
+import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.common.base.Joiner;
@@ -68,50 +72,162 @@ public class HsTaskPage extends HsView {
h2($(TITLE));
return;
}
- TBODY<TABLE<Hamlet>> tbody = html.
+ TaskType type = null;
+ String symbol = $(TASK_TYPE);
+ if (!symbol.isEmpty()) {
+ type = MRApps.taskType(symbol);
+ } else {
+ type = app.getTask().getType();
+ }
+
+ TR<THEAD<TABLE<Hamlet>>> headRow = html.
table("#attempts").
thead().
- tr().
+ tr();
+
+ headRow.
th(".id", "Attempt").
- th(".progress", "Progress").
th(".state", "State").
th(".node", "node").
- th(".tsh", "Started").
- th(".tsh", "Finished").
- th(".tsh", "Elapsed").
- th(".note", "Note")._()._().
- tbody();
+ th(".tsh", "Start Time");
+
+ if(type == TaskType.REDUCE) {
+ headRow.th("Shuffle Finish Time");
+ headRow.th("Merge Finish Time");
+ }
+
+ headRow.th("Finish Time"); //Attempt
+
+ if(type == TaskType.REDUCE) {
+ headRow.th("Elapsed Time Shuffle"); //Attempt
+ headRow.th("Elapsed Time Merge"); //Attempt
+ headRow.th("Elapsed Time Reduce"); //Attempt
+ }
+ headRow.th("Elapsed Time").
+ th(".note", "Note");
+
+ TBODY<TABLE<Hamlet>> tbody = headRow._()._().tbody();
for (TaskAttempt ta : getTaskAttempts()) {
String taid = MRApps.toString(ta.getID());
- String progress = percent(ta.getProgress());
- ContainerId containerId = ta.getAssignedContainerID();
String nodeHttpAddr = ta.getNodeHttpAddress();
- long startTime = ta.getLaunchTime();
- long finishTime = ta.getFinishTime();
- long elapsed = Times.elapsed(startTime, finishTime);
- TD<TR<TBODY<TABLE<Hamlet>>>> nodeTd = tbody.
- tr().
- td(".id", taid).
- td(".progress", progress).
- td(".state", ta.getState().toString()).
+
+ long attemptStartTime = ta.getLaunchTime();
+ long shuffleFinishTime = -1;
+ long sortFinishTime = -1;
+ long attemptFinishTime = ta.getFinishTime();
+ long elapsedShuffleTime = -1;
+ long elapsedSortTime = -1;
+ long elapsedReduceTime = -1;
+ if(type == TaskType.REDUCE) {
+ shuffleFinishTime = ta.getShuffleFinishTime();
+ sortFinishTime = ta.getSortFinishTime();
+ elapsedShuffleTime =
+ Times.elapsed(attemptStartTime, shuffleFinishTime, false);
+ elapsedSortTime =
+ Times.elapsed(shuffleFinishTime, sortFinishTime, false);
+ elapsedReduceTime =
+ Times.elapsed(sortFinishTime, attemptFinishTime, false);
+ }
+ long attemptElapsed =
+ Times.elapsed(attemptStartTime, attemptFinishTime, false);
+ int sortId = ta.getID().getId() + (ta.getID().getTaskId().getId() * 10000);
+
+ TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
+ row.
td().
- a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
- if (containerId != null) {
- String containerIdStr = ConverterUtils.toString(containerId);
- nodeTd._(" ").
- a(".logslink", url("http://", nodeHttpAddr, "yarn", "containerlogs",
- containerIdStr), "logs");
+ br().$title(String.valueOf(sortId))._(). // sorting
+ _(taid)._().
+ td(ta.getState().toString()).
+ td().a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr)._();
+
+ row.td().
+ br().$title(String.valueOf(attemptStartTime))._().
+ _(Times.format(attemptStartTime))._();
+
+ if(type == TaskType.REDUCE) {
+ row.td().
+ br().$title(String.valueOf(shuffleFinishTime))._().
+ _(Times.format(shuffleFinishTime))._();
+ row.td().
+ br().$title(String.valueOf(sortFinishTime))._().
+ _(Times.format(sortFinishTime))._();
}
- nodeTd._().
- td(".ts", Times.format(startTime)).
- td(".ts", Times.format(finishTime)).
- td(".dt", StringUtils.formatTime(elapsed)).
- td(".note", Joiner.on('\n').join(ta.getDiagnostics()))._();
+ row.
+ td().
+ br().$title(String.valueOf(attemptFinishTime))._().
+ _(Times.format(attemptFinishTime))._();
+
+ if(type == TaskType.REDUCE) {
+ row.td().
+ br().$title(String.valueOf(elapsedShuffleTime))._().
+ _(formatTime(elapsedShuffleTime))._();
+ row.td().
+ br().$title(String.valueOf(elapsedSortTime))._().
+ _(formatTime(elapsedSortTime))._();
+ row.td().
+ br().$title(String.valueOf(elapsedReduceTime))._().
+ _(formatTime(elapsedReduceTime))._();
+ }
+
+ row.
+ td().
+ br().$title(String.valueOf(attemptElapsed))._().
+ _(formatTime(attemptElapsed))._().
+ td(".note", Joiner.on('\n').join(ta.getDiagnostics()));
+ row._();
+ }
+
+
+ TR<TFOOT<TABLE<Hamlet>>> footRow = tbody._().tfoot().tr();
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("attempt_name").$value("Attempt")._()._().
+ th().input("search_init").$type(InputType.text).
+ $name("attempt_state").$value("State")._()._().
+ th().input("search_init").$type(InputType.text).
+ $name("attempt_node").$value("Node")._()._().
+ th().input("search_init").$type(InputType.text).
+ $name("attempt_start_time").$value("Start Time")._()._();
+
+ if(type == TaskType.REDUCE) {
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("shuffle_time").$value("Shuffle Time")._()._();
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("merge_time").$value("Merge Time")._()._();
}
- tbody._()._();
+
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("attempt_finish").$value("Finish Time")._()._();
+
+ if(type == TaskType.REDUCE) {
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("elapsed_shuffle_time").$value("Elapsed Shuffle Time")._()._();
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("elapsed_merge_time").$value("Elapsed Merge Time")._()._();
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("elapsed_reduce_time").$value("Elapsed Reduce Time")._()._();
+ }
+
+ footRow.
+ th().input("search_init").$type(InputType.text).
+ $name("attempt_elapsed").$value("Elapsed Time")._()._().
+ th().input("search_init").$type(InputType.text).
+ $name("note").$value("Note")._()._();
+
+ footRow._()._()._();
}
+ private String formatTime(long elapsed) {
+ return elapsed < 0 ? "N/A" : StringUtils.formatTime(elapsed);
+ }
+
/**
* @return true if this is a valid request else false.
*/
@@ -138,6 +254,7 @@ public class HsTaskPage extends HsView {
//Set up the java script and CSS for the attempts table
set(DATATABLES_ID, "attempts");
set(initID(DATATABLES, "attempts"), attemptsTableInit());
+ set(postInitID(DATATABLES, "attempts"), attemptsPostTableInit());
setTableStyles(html, "attempts");
}
@@ -154,6 +271,49 @@ public class HsTaskPage extends HsView {
* attempts table.
*/
private String attemptsTableInit() {
- return tableInit().append("}").toString();
+ TaskType type = null;
+ String symbol = $(TASK_TYPE);
+ if (!symbol.isEmpty()) {
+ type = MRApps.taskType(symbol);
+ } else {
+ TaskId taskID = MRApps.toTaskID($(TASK_ID));
+ type = taskID.getTaskType();
+ }
+ StringBuilder b = tableInit().
+ append(",aoColumnDefs:[");
+
+ b.append("{'sType':'title-numeric', 'aTargets': [ 0");
+ if(type == TaskType.REDUCE) {
+ b.append(", 7, 8, 9, 10");
+ } else { //MAP
+ b.append(", 5");
+ }
+ b.append(" ] }");
+ b.append("]}");
+ return b.toString();
+ }
+
+ private String attemptsPostTableInit() {
+ return "var asInitVals = new Array();\n" +
+ "$('tfoot input').keyup( function () \n{"+
+ " attemptsDataTable.fnFilter( this.value, $('tfoot input').index(this) );\n"+
+ "} );\n"+
+ "$('tfoot input').each( function (i) {\n"+
+ " asInitVals[i] = this.value;\n"+
+ "} );\n"+
+ "$('tfoot input').focus( function () {\n"+
+ " if ( this.className == 'search_init' )\n"+
+ " {\n"+
+ " this.className = '';\n"+
+ " this.value = '';\n"+
+ " }\n"+
+ "} );\n"+
+ "$('tfoot input').blur( function (i) {\n"+
+ " if ( this.value == '' )\n"+
+ " {\n"+
+ " this.className = 'search_init';\n"+
+ " this.value = asInitVals[$('tfoot input').index(this)];\n"+
+ " }\n"+
+ "} );\n";
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java Thu Sep 15 23:07:48 2011
@@ -18,13 +18,16 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
-import org.apache.hadoop.mapreduce.v2.app.webapp.TasksBlock;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.SubView;
/**
@@ -41,15 +44,16 @@ public class HsTasksPage extends HsView
set(DATATABLES_ID, "tasks");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
set(initID(DATATABLES, "tasks"), tasksTableInit());
+ set(postInitID(DATATABLES, "tasks"), jobsPostTableInit());
setTableStyles(html, "tasks");
}
-
+
/**
* The content of this page is the TasksBlock
- * @return TasksBlock.class
+ * @return HsTasksBlock.class
*/
@Override protected Class<? extends SubView> content() {
- return TasksBlock.class;
+ return HsTasksBlock.class;
}
/**
@@ -57,9 +61,45 @@ public class HsTasksPage extends HsView
* for the tasks table.
*/
private String tasksTableInit() {
- return tableInit().
- append(",aoColumns:[{sType:'title-numeric'},{sType:'title-numeric',").
- append("bSearchable:false},null,{sType:'title-numeric'},").
- append("{sType:'title-numeric'},{sType:'title-numeric'}]}").toString();
+ TaskType type = null;
+ String symbol = $(TASK_TYPE);
+ if (!symbol.isEmpty()) {
+ type = MRApps.taskType(symbol);
+ }
+ StringBuilder b = tableInit().
+ append(",aoColumnDefs:[");
+ b.append("{'sType':'title-numeric', 'aTargets': [ 0, 4");
+ if(type == TaskType.REDUCE) {
+ b.append(", 9, 10, 11, 12");
+ } else { //MAP
+ b.append(", 7");
+ }
+ b.append(" ] }");
+ b.append("]}");
+ return b.toString();
+ }
+
+ private String jobsPostTableInit() {
+ return "var asInitVals = new Array();\n" +
+ "$('tfoot input').keyup( function () \n{"+
+ " tasksDataTable.fnFilter( this.value, $('tfoot input').index(this) );\n"+
+ "} );\n"+
+ "$('tfoot input').each( function (i) {\n"+
+ " asInitVals[i] = this.value;\n"+
+ "} );\n"+
+ "$('tfoot input').focus( function () {\n"+
+ " if ( this.className == 'search_init' )\n"+
+ " {\n"+
+ " this.className = '';\n"+
+ " this.value = '';\n"+
+ " }\n"+
+ "} );\n"+
+ "$('tfoot input').blur( function (i) {\n"+
+ " if ( this.value == '' )\n"+
+ " {\n"+
+ " this.className = 'search_init';\n"+
+ " this.value = asInitVals[$('tfoot input').index(this)];\n"+
+ " }\n"+
+ "} );\n";
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Thu Sep 15 23:07:48 2011
@@ -39,6 +39,7 @@ public class HsWebApp extends WebApp imp
route("/", HsController.class);
route("/app", HsController.class);
route(pajoin("/job", JOB_ID), HsController.class, "job");
+ route(pajoin("/conf", JOB_ID), HsController.class, "conf");
route(pajoin("/jobcounters", JOB_ID), HsController.class, "jobCounters");
route(pajoin("/tasks", JOB_ID, TASK_TYPE), HsController.class, "tasks");
route(pajoin("/attempts", JOB_ID, TASK_TYPE, ATTEMPT_STATE),
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java Thu Sep 15 23:07:48 2011
@@ -19,25 +19,36 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.ATTEMPT_STATE;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.junit.Assert.assertEquals;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams;
+import org.apache.hadoop.mapreduce.v2.app.webapp.TestAMWebApp;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Test;
import com.google.inject.Injector;
public class TestHSWebApp {
+ private static final Log LOG = LogFactory.getLog(TestHSWebApp.class);
static class TestAppContext implements AppContext {
final ApplicationAttemptId appAttemptID;
@@ -111,16 +122,53 @@ public class TestHSWebApp {
}
@Test public void testJobView() {
- WebAppTests.testPage(HsJobPage.class, AppContext.class, new TestAppContext());
+ LOG.info("HsJobPage");
+ AppContext appContext = new TestAppContext();
+ Map<String, String> params = TestAMWebApp.getJobParams(appContext);
+ WebAppTests.testPage(HsJobPage.class, AppContext.class, appContext, params);
}
- @Test public void testTasksView() {
- WebAppTests.testPage(HsTasksPage.class, AppContext.class,
- new TestAppContext());
+ @Test
+ public void testTasksView() {
+ LOG.info("HsTasksPage");
+ AppContext appContext = new TestAppContext();
+ Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
+ WebAppTests.testPage(HsTasksPage.class, AppContext.class, appContext,
+ params);
+ }
+
+ @Test
+ public void testTaskView() {
+ LOG.info("HsTaskPage");
+ AppContext appContext = new TestAppContext();
+ Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
+ WebAppTests
+ .testPage(HsTaskPage.class, AppContext.class, appContext, params);
}
- @Test public void testTaskView() {
- WebAppTests.testPage(HsTaskPage.class, AppContext.class,
+ @Test public void testAttemptsWithJobView() {
+ LOG.info("HsAttemptsPage with data");
+ TestAppContext ctx = new TestAppContext();
+ JobId id = ctx.getAllJobs().keySet().iterator().next();
+ Map<String, String> params = new HashMap<String,String>();
+ params.put(JOB_ID, id.toString());
+ params.put(TASK_TYPE, "m");
+ params.put(ATTEMPT_STATE, "SUCCESSFUL");
+ WebAppTests.testPage(HsAttemptsPage.class, AppContext.class,
+ ctx, params);
+ }
+
+ @Test public void testAttemptsView() {
+ LOG.info("HsAttemptsPage");
+ AppContext appContext = new TestAppContext();
+ Map<String, String> params = TestAMWebApp.getTaskParams(appContext);
+ WebAppTests.testPage(HsAttemptsPage.class, AppContext.class,
+ appContext, params);
+ }
+
+ @Test public void testConfView() {
+ LOG.info("HsConfPage");
+ WebAppTests.testPage(HsConfPage.class, AppContext.class,
new TestAppContext());
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java Thu Sep 15 23:07:48 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
@@ -28,12 +29,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.security.client.ClientHSSecurityInfo;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
public class ClientCache {
@@ -72,16 +74,21 @@ public class ClientCache {
private MRClientProtocol instantiateHistoryProxy()
throws IOException {
- String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS,
+ final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
LOG.info("Connecting to HistoryServer at: " + serviceAddr);
- Configuration myConf = new Configuration(conf);
- //TODO This should ideally be using it's own class (instead of ClientRMSecurityInfo)
+ final Configuration myConf = new Configuration(conf);
myConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
- ClientRMSecurityInfo.class, SecurityInfo.class);
- YarnRPC rpc = YarnRPC.create(myConf);
+ ClientHSSecurityInfo.class, SecurityInfo.class);
+ final YarnRPC rpc = YarnRPC.create(myConf);
LOG.info("Connected to HistoryServer at: " + serviceAddr);
- return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
- NetUtils.createSocketAddr(serviceAddr), myConf);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() {
+ return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), myConf);
+ }
+ });
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1171315&r1=1171314&r2=1171315&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Sep 15 23:07:48 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -33,7 +32,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -55,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskR
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.MRConstants;
@@ -72,6 +69,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -237,7 +235,6 @@ public class YARNRunner implements Clien
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
- setupDistributedCache(conf, appContext);
// XXX Remove
in.close();
@@ -273,16 +270,18 @@ public class YARNRunner implements Clien
public ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf,
String jobSubmitDir, Credentials ts) throws IOException {
- ApplicationSubmissionContext appContext =
- recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
ApplicationId applicationId = resMgrDelegate.getApplicationId();
- appContext.setApplicationId(applicationId);
+
+ // Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
LOG.info("AppMaster capability = " + capability);
- appContext.setMasterCapability(capability);
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir = ConverterUtils
@@ -292,14 +291,11 @@ public class YARNRunner implements Clien
LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir);
- appContext.setResource(MRConstants.JOB_SUBMIT_DIR,
- yarnUrlForJobSubmitDir);
-
- appContext.setResourceTodo(MRConstants.JOB_CONF_FILE,
+ localResources.put(MRConstants.JOB_CONF_FILE,
createApplicationResource(defaultFileContext,
jobConfPath));
if (jobConf.get(MRJobConfig.JAR) != null) {
- appContext.setResourceTodo(MRConstants.JOB_JAR,
+ localResources.put(MRConstants.JOB_JAR,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, MRConstants.JOB_JAR)));
} else {
@@ -312,30 +308,21 @@ public class YARNRunner implements Clien
// TODO gross hack
for (String s : new String[] { "job.split", "job.splitmetainfo",
MRConstants.APPLICATION_TOKENS_FILE }) {
- appContext.setResourceTodo(
+ localResources.put(
MRConstants.JOB_SUBMIT_DIR + "/" + s,
- createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s)));
- }
-
- // TODO: Only if security is on.
- List<String> fsTokens = new ArrayList<String>();
- for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
- fsTokens.add(token.encodeToUrlString());
+ createApplicationResource(defaultFileContext,
+ new Path(jobSubmitDir, s)));
}
- // TODO - Remove this!
- appContext.addAllFsTokens(fsTokens);
- DataOutputBuffer dob = new DataOutputBuffer();
- ts.writeTokenStorageToStream(dob);
- appContext.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+ // Setup security tokens
+ ByteBuffer securityTokens = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
- // Add queue information
- appContext.setQueue(jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME));
-
- // Add job name
- appContext.setApplicationName(jobConf.get(JobContext.JOB_NAME, "N/A"));
-
- // Add the command line
+ // Setup the command to run the AM
String javaHome = "$JAVA_HOME";
Vector<CharSequence> vargs = new Vector<CharSequence>(8);
vargs.add(javaHome + "/bin/java");
@@ -346,13 +333,6 @@ public class YARNRunner implements Clien
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
- // Add { job jar, MR app jar } to classpath.
- Map<String, String> environment = new HashMap<String, String>();
- MRApps.setInitialClasspath(environment);
- MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
- MRApps.addToClassPath(environment,
- MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
- appContext.addAllEnvironment(environment);
vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
vargs.add(String.valueOf(applicationId.getId()));
@@ -370,140 +350,43 @@ public class YARNRunner implements Clien
LOG.info("Command to launch container for ApplicationMaster is : "
+ mergedCommand);
+
+ // Setup the environment - Add { job jar, MR app jar } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+ MRApps.setInitialClasspath(environment);
+ MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
+ MRApps.addToClassPath(environment,
+ MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
- appContext.addAllCommands(vargsFinal);
- // TODO: RM should get this from RPC.
- appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
- return appContext;
- }
+ // Parse distributed cache
+ MRApps.setupDistributedCache(jobConf, localResources, environment);
- /**
- * * TODO: Copied for now from TaskAttemptImpl.java ... fixme
- * @param strs
- * @return
- */
- private static long[] parseTimeStamps(String[] strs) {
- if (null == strs) {
- return null;
- }
- long[] result = new long[strs.length];
- for(int i=0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
+ // Setup ContainerLaunchContext for AM container
+ ContainerLaunchContext amContainer =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ amContainer.setResource(capability); // Resource (mem) required
+ amContainer.setLocalResources(localResources); // Local resources
+ amContainer.setEnvironment(environment); // Environment
+ amContainer.setCommands(vargsFinal); // Command for AM
+ amContainer.setContainerTokens(securityTokens); // Security tokens
- /**
- * TODO: Copied for now from TaskAttemptImpl.java ... fixme
- *
- * TODO: This is currently needed in YarnRunner as user code like setupJob,
- * cleanupJob may need access to dist-cache. Once we separate distcache for
- * maps, reduces, setup etc, this can include only a subset of artificats.
- * This is also needed for uberAM case where we run everything inside AM.
- */
- private void setupDistributedCache(Configuration conf,
- ApplicationSubmissionContext container) throws IOException {
-
- // Cache archives
- parseDistributedCacheArtifacts(conf, container, LocalResourceType.ARCHIVE,
- DistributedCache.getCacheArchives(conf),
- parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
- getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
- DistributedCache.getArchiveVisibilities(conf),
- DistributedCache.getArchiveClassPaths(conf));
-
- // Cache files
- parseDistributedCacheArtifacts(conf, container, LocalResourceType.FILE,
- DistributedCache.getCacheFiles(conf),
- parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
- getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
- DistributedCache.getFileVisibilities(conf),
- DistributedCache.getFileClassPaths(conf));
- }
-
- // TODO - Move this to MR!
- // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
- private void parseDistributedCacheArtifacts(Configuration conf,
- ApplicationSubmissionContext container, LocalResourceType type,
- URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
- Path[] pathsToPutOnClasspath) throws IOException {
-
- if (uris != null) {
- // Sanity check
- if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
- (uris.length != visibilities.length)) {
- throw new IllegalArgumentException("Invalid specification for " +
- "distributed-cache artifacts of type " + type + " :" +
- " #uris=" + uris.length +
- " #timestamps=" + timestamps.length +
- " #visibilities=" + visibilities.length
- );
- }
-
- Map<String, Path> classPaths = new HashMap<String, Path>();
- if (pathsToPutOnClasspath != null) {
- for (Path p : pathsToPutOnClasspath) {
- FileSystem fs = p.getFileSystem(conf);
- p = p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
- classPaths.put(p.toUri().getPath().toString(), p);
- }
- }
- for (int i = 0; i < uris.length; ++i) {
- URI u = uris[i];
- Path p = new Path(u);
- FileSystem fs = p.getFileSystem(conf);
- p = fs.resolvePath(
- p.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
- // Add URI fragment or just the filename
- Path name = new Path((null == u.getFragment())
- ? p.getName()
- : u.getFragment());
- if (name.isAbsolute()) {
- throw new IllegalArgumentException("Resource name must be relative");
- }
- String linkName = name.toUri().getPath();
- container.setResourceTodo(
- linkName,
- createLocalResource(
- p.toUri(), type,
- visibilities[i]
- ? LocalResourceVisibility.PUBLIC
- : LocalResourceVisibility.PRIVATE,
- sizes[i], timestamps[i])
- );
- if (classPaths.containsKey(u.getPath())) {
- Map<String, String> environment = container.getAllEnvironment();
- MRApps.addToClassPath(environment, linkName);
- }
- }
- }
- }
+ // Set up the ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(applicationId); // ApplicationId
+ appContext.setUser( // User name
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ appContext.setQueue( // Queue name
+ jobConf.get(JobContext.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ appContext.setApplicationName( // Job name
+ jobConf.get(JobContext.JOB_NAME,
+ YarnConfiguration.DEFAULT_APPLICATION_NAME));
+ appContext.setAMContainerSpec(amContainer); // AM Container
- // TODO - Move this to MR!
- private static long[] getFileSizes(Configuration conf, String key) {
- String[] strs = conf.getStrings(key);
- if (strs == null) {
- return null;
- }
- long[] result = new long[strs.length];
- for(int i=0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
-
- private LocalResource createLocalResource(URI uri,
- LocalResourceType type, LocalResourceVisibility visibility,
- long size, long timestamp) throws IOException {
- LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
- resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
- resource.setType(type);
- resource.setVisibility(visibility);
- resource.setSize(size);
- resource.setTimestamp(timestamp);
- return resource;
+ return appContext;
}
-
+
@Override
public void setJobPriority(JobID arg0, String arg1) throws IOException,
InterruptedException {