You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/04/24 21:05:20 UTC
svn commit: r1329947 [2/4] - in
/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/
hadoop-mapreduce-...
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Tue Apr 24 19:05:09 2012
@@ -25,12 +25,17 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,6 +62,8 @@ import org.apache.hadoop.mapreduce.v2.jo
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* This class provides a way to interact with history files in a thread safe
* manor.
@@ -67,33 +74,251 @@ public class HistoryFileManager extends
private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
+ private static enum HistoryInfoState {
+ IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
+ };
+
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
.doneSubdirsBeforeSerialTail();
- public static class MetaInfo {
+ /**
+ * Maps between a serial number (generated based on jobId) and the timestamp
+ * component(s) to which it belongs. Facilitates jobId based searches. If a
+ * jobId is not found in this list - it will not be found.
+ */
+ private static class SerialNumberIndex {
+ private SortedMap<String, Set<String>> cache;
+ private int maxSize;
+
+ public SerialNumberIndex(int maxSize) {
+ this.cache = new TreeMap<String, Set<String>>();
+ this.maxSize = maxSize;
+ }
+
+ public synchronized void add(String serialPart, String timestampPart) {
+ if (!cache.containsKey(serialPart)) {
+ cache.put(serialPart, new HashSet<String>());
+ if (cache.size() > maxSize) {
+ String key = cache.firstKey();
+ LOG.error("Dropping " + key
+ + " from the SerialNumberIndex. We will no "
+ + "longer be able to see jobs that are in that serial index for "
+ + cache.get(key));
+ cache.remove(key);
+ }
+ }
+ Set<String> datePartSet = cache.get(serialPart);
+ datePartSet.add(timestampPart);
+ }
+
+ public synchronized void remove(String serialPart, String timeStampPart) {
+ if (cache.containsKey(serialPart)) {
+ Set<String> set = cache.get(serialPart);
+ set.remove(timeStampPart);
+ if (set.isEmpty()) {
+ cache.remove(serialPart);
+ }
+ }
+ }
+
+ public synchronized Set<String> get(String serialPart) {
+ Set<String> found = cache.get(serialPart);
+ if (found != null) {
+ return new HashSet<String>(found);
+ }
+ return null;
+ }
+ }
+
+ private static class JobListCache {
+ private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
+ private int maxSize;
+ private long maxAge;
+
+ public JobListCache(int maxSize, long maxAge) {
+ this.maxSize = maxSize;
+ this.maxAge = maxAge;
+ this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
+ }
+
+ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
+ JobId jobId = fileInfo.getJobIndexInfo().getJobId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding " + jobId + " to job list cache with "
+ + fileInfo.getJobIndexInfo());
+ }
+ HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
+ if (cache.size() > maxSize) {
+ //There is a race here, where more then one thread could be trying to
+ // remove entries. This could result in too many entries being removed
+ // from the cache. This is considered OK as the size of the cache
+ // should be rather large, and we would rather have performance over
+ // keeping the cache size exactly at the maximum.
+ Iterator<JobId> keys = cache.navigableKeySet().iterator();
+ long cutoff = System.currentTimeMillis() - maxAge;
+ while(cache.size() > maxSize && keys.hasNext()) {
+ JobId key = keys.next();
+ HistoryFileInfo firstValue = cache.get(key);
+ if(firstValue != null) {
+ synchronized(firstValue) {
+ if (firstValue.isMovePending()) {
+ if(firstValue.didMoveFail() &&
+ firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
+ cache.remove(key);
+ //Now lets try to delete it
+ try {
+ firstValue.delete();
+ } catch (IOException e) {
+ LOG.error("Error while trying to delete history files" +
+ " that could not be moved to done.", e);
+ }
+ } else {
+ LOG.warn("Waiting to remove " + key
+ + " from JobListCache because it is not in done yet.");
+ }
+ } else {
+ cache.remove(key);
+ }
+ }
+ }
+ }
+ }
+ return old;
+ }
+
+ public void delete(HistoryFileInfo fileInfo) {
+ cache.remove(fileInfo.getJobId());
+ }
+
+ public Collection<HistoryFileInfo> values() {
+ return new ArrayList<HistoryFileInfo>(cache.values());
+ }
+
+ public HistoryFileInfo get(JobId jobId) {
+ return cache.get(jobId);
+ }
+ }
+
+ public class HistoryFileInfo {
private Path historyFile;
private Path confFile;
private Path summaryFile;
private JobIndexInfo jobIndexInfo;
+ private HistoryInfoState state;
- public MetaInfo(Path historyFile, Path confFile, Path summaryFile,
- JobIndexInfo jobIndexInfo) {
+ private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
+ JobIndexInfo jobIndexInfo, boolean isInDone) {
this.historyFile = historyFile;
this.confFile = confFile;
this.summaryFile = summaryFile;
this.jobIndexInfo = jobIndexInfo;
+ state = isInDone ? HistoryInfoState.IN_DONE
+ : HistoryInfoState.IN_INTERMEDIATE;
}
- private Path getHistoryFile() {
- return historyFile;
+ private synchronized boolean isMovePending() {
+ return state == HistoryInfoState.IN_INTERMEDIATE
+ || state == HistoryInfoState.MOVE_FAILED;
}
- private Path getConfFile() {
- return confFile;
+ private synchronized boolean didMoveFail() {
+ return state == HistoryInfoState.MOVE_FAILED;
+ }
+
+ /**
+ * @return true if the files backed by this were deleted.
+ */
+ public synchronized boolean isDeleted() {
+ return state == HistoryInfoState.DELETED;
}
- private Path getSummaryFile() {
- return summaryFile;
+ private synchronized void moveToDone() throws IOException {
+ if (!isMovePending()) {
+ // It was either deleted or is already in done. Either way do nothing
+ return;
+ }
+ try {
+ long completeTime = jobIndexInfo.getFinishTime();
+ if (completeTime == 0) {
+ completeTime = System.currentTimeMillis();
+ }
+ JobId jobId = jobIndexInfo.getJobId();
+
+ List<Path> paths = new ArrayList<Path>(2);
+ if (historyFile == null) {
+ LOG.info("No file for job-history with " + jobId + " found in cache!");
+ } else {
+ paths.add(historyFile);
+ }
+
+ if (confFile == null) {
+ LOG.info("No file for jobConf with " + jobId + " found in cache!");
+ } else {
+ paths.add(confFile);
+ }
+
+ if (summaryFile == null) {
+ LOG.info("No summary file for job: " + jobId);
+ } else {
+ String jobSummaryString = getJobSummary(intermediateDoneDirFc,
+ summaryFile);
+ SUMMARY_LOG.info(jobSummaryString);
+ LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
+ intermediateDoneDirFc.delete(summaryFile, false);
+ summaryFile = null;
+ }
+
+ Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
+ addDirectoryToSerialNumberIndex(targetDir);
+ makeDoneSubdir(targetDir);
+ if (historyFile != null) {
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
+ .getName()));
+ if (!toPath.equals(historyFile)) {
+ moveToDoneNow(historyFile, toPath);
+ historyFile = toPath;
+ }
+ }
+ if (confFile != null) {
+ Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
+ .getName()));
+ if (!toPath.equals(confFile)) {
+ moveToDoneNow(confFile, toPath);
+ confFile = toPath;
+ }
+ }
+ state = HistoryInfoState.IN_DONE;
+ } catch (Throwable t) {
+ LOG.error("Error while trying to move a job to done", t);
+ this.state = HistoryInfoState.MOVE_FAILED;
+ }
+ }
+
+ /**
+ * Parse a job from the JobHistoryFile, if the underlying file is not going
+ * to be deleted.
+ *
+ * @return the Job or null if the underlying file was deleted.
+ * @throws IOException
+ * if there is an error trying to read the file.
+ */
+ public synchronized Job loadJob() throws IOException {
+ return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
+ false, jobIndexInfo.getUser(), this, aclsMgr);
+ }
+
+ /**
+ * Return the history file. This should only be used for testing.
+ * @return the history file.
+ */
+ synchronized Path getHistoryFile() {
+ return historyFile;
+ }
+
+ private synchronized void delete() throws IOException {
+ state = HistoryInfoState.DELETED;
+ doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
+ doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
}
public JobIndexInfo getJobIndexInfo() {
@@ -104,57 +329,35 @@ public class HistoryFileManager extends
return jobIndexInfo.getJobId();
}
- private void setHistoryFile(Path historyFile) {
- this.historyFile = historyFile;
- }
-
- private void setConfFile(Path confFile) {
- this.confFile = confFile;
+ public synchronized Path getConfFile() {
+ return confFile;
}
-
- private void setSummaryFile(Path summaryFile) {
- this.summaryFile = summaryFile;
+
+ public synchronized Configuration loadConfFile() throws IOException {
+ FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
+ Configuration jobConf = new Configuration(false);
+ jobConf.addResource(fc.open(confFile));
+ return jobConf;
}
}
- /**
- * Maps between a serial number (generated based on jobId) and the timestamp
- * component(s) to which it belongs. Facilitates jobId based searches. If a
- * jobId is not found in this list - it will not be found.
- */
- private final SortedMap<String, Set<String>> idToDateString =
- new TreeMap<String, Set<String>>();
- // The number of entries in idToDateString
- private int dateStringCacheSize;
-
- // Maintains minimal details for recent jobs (parsed from history file name).
- // Sorted on Job Completion Time.
- private final SortedMap<JobId, MetaInfo> jobListCache =
- new ConcurrentSkipListMap<JobId, MetaInfo>();
- // The number of jobs to maintain in the job list cache.
- private int jobListCacheSize;
-
- // Re-use existing MetaInfo objects if they exist for the specific JobId.
- // (synchronization on MetaInfo)
- // Check for existence of the object when using iterators.
- private final SortedMap<JobId, MetaInfo> intermediateListCache =
- new ConcurrentSkipListMap<JobId, MetaInfo>();
+ private SerialNumberIndex serialNumberIndex = null;
+ private JobListCache jobListCache = null;
// Maintains a list of known done subdirectories.
- private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+ private final Set<Path> existingDoneSubdirs = Collections
+ .synchronizedSet(new HashSet<Path>());
/**
* Maintains a mapping between intermediate user directories and the last
* known modification time.
*/
- private Map<String, Long> userDirModificationTimeMap =
- new HashMap<String, Long>();
+ private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
private JobACLsManager aclsMgr;
private Configuration conf;
- // TODO Remove me!!!!
private boolean debugMode;
private String serialNumberFormat;
@@ -165,6 +368,9 @@ public class HistoryFileManager extends
private FileContext intermediateDoneDirFc; // Intermediate Done Dir
// FileContext
+ private ThreadPoolExecutor moveToDoneExecutor = null;
+ private long maxHistoryAge = 0;
+
public HistoryFileManager() {
super(HistoryFileManager.class.getName());
}
@@ -211,12 +417,25 @@ public class HistoryFileManager extends
this.aclsMgr = new JobACLsManager(conf);
- jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
- JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE);
+ maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
+
+ jobListCache = new JobListCache(conf.getInt(
+ JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+ JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
+ maxHistoryAge);
- dateStringCacheSize = conf.getInt(
+ serialNumberIndex = new SerialNumberIndex(conf.getInt(
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
- JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE);
+ JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
+
+ int numMoveThreads = conf.getInt(
+ JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+ "MoveIntermediateToDone Thread #%d").build();
+ moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
+ 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
super.init(conf);
}
@@ -249,6 +468,7 @@ public class HistoryFileManager extends
void initExisting() throws IOException {
LOG.info("Initializing Existing Jobs...");
List<FileStatus> timestampedDirList = findTimestampedDirectories();
+ // Sort first just so insertion is in a consistent order
Collections.sort(timestampedDirList);
for (FileStatus fs : timestampedDirList) {
// TODO Could verify the correct format for these directories.
@@ -271,16 +491,7 @@ public class HistoryFileManager extends
+ serialDirPath.toString() + ". Continuing with next");
return;
}
- synchronized (idToDateString) {
- // TODO make this thread safe without the synchronize
- if (idToDateString.containsKey(serialPart)) {
- Set<String> set = idToDateString.get(serialPart);
- set.remove(timeStampPart);
- if (set.isEmpty()) {
- idToDateString.remove(serialPart);
- }
- }
- }
+ serialNumberIndex.remove(serialPart, timeStampPart);
}
private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
@@ -299,21 +510,7 @@ public class HistoryFileManager extends
LOG.warn("Could not find serial portion from path: "
+ serialDirPath.toString() + ". Continuing with next");
}
- addToSerialNumberIndex(serialPart, timestampPart);
- }
-
- private void addToSerialNumberIndex(String serialPart, String timestampPart) {
- synchronized (idToDateString) {
- // TODO make this thread safe without the synchronize
- if (!idToDateString.containsKey(serialPart)) {
- idToDateString.put(serialPart, new HashSet<String>());
- if (idToDateString.size() > dateStringCacheSize) {
- idToDateString.remove(idToDateString.firstKey());
- }
- Set<String> datePartSet = idToDateString.get(serialPart);
- datePartSet.add(timestampPart);
- }
- }
+ serialNumberIndex.add(serialPart, timestampPart);
}
private void addDirectoryToJobListCache(Path path) throws IOException {
@@ -332,10 +529,10 @@ public class HistoryFileManager extends
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- addToJobListCache(metaInfo);
+ HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+ .getPath().getParent(), confFileName), new Path(fs.getPath()
+ .getParent(), summaryFileName), jobIndexInfo, true);
+ jobListCache.addIfAbsent(fileInfo);
}
}
@@ -371,25 +568,18 @@ public class HistoryFileManager extends
return fsList;
}
- private void addToJobListCache(MetaInfo metaInfo) {
- JobId jobId = metaInfo.getJobIndexInfo().getJobId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + jobId + " to job list cache with "
- + metaInfo.getJobIndexInfo());
- }
- jobListCache.put(jobId, metaInfo);
- if (jobListCache.size() > jobListCacheSize) {
- jobListCache.remove(jobListCache.firstKey());
- }
- }
-
/**
* Scans the intermediate directory to find user directories. Scans these for
- * history files if the modification time for the directory has changed.
+ * history files if the modification time for the directory has changed. Once
+ * it finds history files it starts the process of moving them to the done
+ * directory.
*
* @throws IOException
+ * if there was a error while scanning
*/
- private void scanIntermediateDirectory() throws IOException {
+ void scanIntermediateDirectory() throws IOException {
+ // TODO it would be great to limit how often this happens, except in the
+ // case where we are looking for a particular job.
List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
intermediateDoneDirFc, intermediateDoneDirPath, "");
@@ -405,7 +595,12 @@ public class HistoryFileManager extends
}
}
if (shouldScan) {
- scanIntermediateDirectory(userDir.getPath());
+ try {
+ scanIntermediateDirectory(userDir.getPath());
+ } catch (IOException e) {
+ LOG.error("Error while trying to scan the directory "
+ + userDir.getPath(), e);
+ }
}
}
}
@@ -426,11 +621,33 @@ public class HistoryFileManager extends
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
- intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+ HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+ .getPath().getParent(), confFileName), new Path(fs.getPath()
+ .getParent(), summaryFileName), jobIndexInfo, false);
+
+ final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
+ if (old == null || old.didMoveFail()) {
+ final HistoryFileInfo found = (old == null) ? fileInfo : old;
+ long cutoff = System.currentTimeMillis() - maxHistoryAge;
+ if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
+ try {
+ found.delete();
+ } catch (IOException e) {
+ LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
+ }
+ } else {
+ moveToDoneExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ found.moveToDone();
+ } catch (IOException e) {
+ LOG.info("Failed to process fileInfo for job: " +
+ found.getJobId(), e);
+ }
+ }
+ });
+ }
}
}
}
@@ -442,11 +659,11 @@ public class HistoryFileManager extends
* fileStatus list of Job History Files.
* @param jobId
* The JobId to find.
- * @return A MetaInfo object for the jobId, null if not found.
+ * @return A FileInfo object for the jobId, null if not found.
* @throws IOException
*/
- private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
- throws IOException {
+ private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
+ JobId jobId) throws IOException {
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
@@ -455,10 +672,10 @@ public class HistoryFileManager extends
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- return metaInfo;
+ HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
+ fs.getPath().getParent(), confFileName), new Path(fs.getPath()
+ .getParent(), summaryFileName), jobIndexInfo, true);
+ return fileInfo;
}
}
return null;
@@ -474,175 +691,51 @@ public class HistoryFileManager extends
* @return
* @throws IOException
*/
- private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
+ private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
String boxedSerialNumber = String.valueOf(jobSerialNumber);
- Set<String> dateStringSet;
- synchronized (idToDateString) {
- Set<String> found = idToDateString.get(boxedSerialNumber);
- if (found == null) {
- return null;
- } else {
- dateStringSet = new HashSet<String>(found);
- }
+ Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
+ if (dateStringSet == null) {
+ return null;
}
for (String timestampPart : dateStringSet) {
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
doneDirFc);
- MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
- if (metaInfo != null) {
- return metaInfo;
+ HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
+ if (fileInfo != null) {
+ return fileInfo;
}
}
return null;
}
- /**
- * Checks for the existence of the job history file in the intermediate
- * directory.
- *
- * @param jobId
- * @return
- * @throws IOException
- */
- private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
- scanIntermediateDirectory();
- return intermediateListCache.get(jobId);
- }
-
- /**
- * Parse a job from the JobHistoryFile, if the underlying file is not going to
- * be deleted.
- *
- * @param metaInfo
- * the where the JobHistory is stored.
- * @return the Job or null if the underlying file was deleted.
- * @throws IOException
- * if there is an error trying to read the file.
- */
- public Job loadJob(MetaInfo metaInfo) throws IOException {
- return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
- metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
- metaInfo.getConfFile(), aclsMgr);
- }
-
- public Collection<MetaInfo> getAllMetaInfo() throws IOException {
+ public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
scanIntermediateDirectory();
- ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
- result.addAll(intermediateListCache.values());
- result.addAll(jobListCache.values());
- return result;
+ return jobListCache.values();
}
- Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
- scanIntermediateDirectory();
- return intermediateListCache.values();
- }
-
- public MetaInfo getMetaInfo(JobId jobId) throws IOException {
- // MetaInfo available in cache.
- MetaInfo metaInfo = null;
- if (jobListCache.containsKey(jobId)) {
- metaInfo = jobListCache.get(jobId);
- }
-
- if (metaInfo != null) {
- return metaInfo;
+ public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
+ // FileInfo available in cache.
+ HistoryFileInfo fileInfo = jobListCache.get(jobId);
+ if (fileInfo != null) {
+ return fileInfo;
}
-
- // MetaInfo not available. Check intermediate directory for meta info.
- metaInfo = scanIntermediateForJob(jobId);
- if (metaInfo != null) {
- return metaInfo;
+ // OK so scan the intermediate to be sure we did not lose it that way
+ scanIntermediateDirectory();
+ fileInfo = jobListCache.get(jobId);
+ if (fileInfo != null) {
+ return fileInfo;
}
// Intermediate directory does not contain job. Search through older ones.
- metaInfo = scanOldDirsForJob(jobId);
- if (metaInfo != null) {
- return metaInfo;
+ fileInfo = scanOldDirsForJob(jobId);
+ if (fileInfo != null) {
+ return fileInfo;
}
return null;
}
- void moveToDone(MetaInfo metaInfo) throws IOException {
- long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
- if (completeTime == 0)
- completeTime = System.currentTimeMillis();
- JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-
- List<Path> paths = new ArrayList<Path>();
- Path historyFile = metaInfo.getHistoryFile();
- if (historyFile == null) {
- LOG.info("No file for job-history with " + jobId + " found in cache!");
- } else {
- paths.add(historyFile);
- }
-
- Path confFile = metaInfo.getConfFile();
- if (confFile == null) {
- LOG.info("No file for jobConf with " + jobId + " found in cache!");
- } else {
- paths.add(confFile);
- }
-
- // TODO Check all mi getters and setters for the conf path
- Path summaryFile = metaInfo.getSummaryFile();
- if (summaryFile == null) {
- LOG.info("No summary file for job: " + jobId);
- } else {
- try {
- String jobSummaryString = getJobSummary(intermediateDoneDirFc,
- summaryFile);
- SUMMARY_LOG.info(jobSummaryString);
- LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
- intermediateDoneDirFc.delete(summaryFile, false);
- metaInfo.setSummaryFile(null);
- } catch (IOException e) {
- LOG.warn("Failed to process summary file: [" + summaryFile + "]");
- throw e;
- }
- }
-
- Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
- addDirectoryToSerialNumberIndex(targetDir);
- try {
- makeDoneSubdir(targetDir);
- } catch (IOException e) {
- LOG.warn("Failed creating subdirectory: " + targetDir
- + " while attempting to move files for jobId: " + jobId);
- throw e;
- }
- synchronized (metaInfo) {
- if (historyFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
- .getName()));
- try {
- moveToDoneNow(historyFile, toPath);
- } catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
- + jobId);
- throw e;
- }
- metaInfo.setHistoryFile(toPath);
- }
- if (confFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
- .getName()));
- try {
- moveToDoneNow(confFile, toPath);
- } catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
- + jobId);
- throw e;
- }
- metaInfo.setConfFile(toPath);
- }
- }
- addToJobListCache(metaInfo);
- intermediateListCache.remove(jobId);
- }
-
private void moveToDoneNow(final Path src, final Path target)
throws IOException {
LOG.info("Moving " + src.toString() + " to " + target.toString());
@@ -658,20 +751,9 @@ public class HistoryFileManager extends
}
private void makeDoneSubdir(Path path) throws IOException {
- boolean existsInExistingCache = false;
- synchronized (existingDoneSubdirs) {
- if (existingDoneSubdirs.contains(path))
- existsInExistingCache = true;
- }
try {
doneDirFc.getFileStatus(path);
- if (!existsInExistingCache) {
- existingDoneSubdirs.add(path);
- if (LOG.isDebugEnabled()) {
- LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
- + " already existed, but it didn't.");
- }
- }
+ existingDoneSubdirs.add(path);
} catch (FileNotFoundException fnfE) {
try {
FsPermission fsp = new FsPermission(
@@ -685,11 +767,8 @@ public class HistoryFileManager extends
+ ", " + fsp);
doneDirFc.setPermission(path, fsp);
}
- synchronized (existingDoneSubdirs) {
- existingDoneSubdirs.add(path);
- }
- } catch (FileAlreadyExistsException faeE) {
- // Nothing to do.
+ existingDoneSubdirs.add(path);
+ } catch (FileAlreadyExistsException faeE) { // Nothing to do.
}
}
}
@@ -713,16 +792,22 @@ public class HistoryFileManager extends
return finishTime;
}
- private void deleteJobFromDone(MetaInfo metaInfo) throws IOException {
- jobListCache.remove(metaInfo.getJobId());
- doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false);
- doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
+ private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
+ jobListCache.delete(fileInfo);
+ fileInfo.delete();
}
+ /**
+ * Clean up older history files.
+ *
+ * @throws IOException
+ * on any error trying to remove the entries.
+ */
@SuppressWarnings("unchecked")
- void clean(long cutoff, HistoryStorage storage) throws IOException {
+ void clean() throws IOException {
// TODO this should be replaced by something that knows about the directory
// structure and will put less of a load on HDFS.
+ long cutoff = System.currentTimeMillis() - maxHistoryAge;
boolean halted = false;
// TODO Delete YYYY/MM/DD directories.
List<FileStatus> serialDirList = findTimestampedDirectories();
@@ -737,13 +822,17 @@ public class HistoryFileManager extends
long effectiveTimestamp = getEffectiveTimestamp(
jobIndexInfo.getFinishTime(), historyFile);
if (effectiveTimestamp <= cutoff) {
- String confFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(
- historyFile.getPath().getParent(), confFileName), null,
- jobIndexInfo);
- storage.jobRemovedFromHDFS(metaInfo.getJobId());
- deleteJobFromDone(metaInfo);
+ HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
+ .getJobId());
+ if (fileInfo == null) {
+ String confFileName = JobHistoryUtils
+ .getIntermediateConfFileName(jobIndexInfo.getJobId());
+
+ fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
+ historyFile.getPath().getParent(), confFileName), null,
+ jobIndexInfo, true);
+ }
+ deleteJobFromDone(fileInfo);
} else {
halted = true;
break;
@@ -752,9 +841,7 @@ public class HistoryFileManager extends
if (!halted) {
doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
- synchronized (existingDoneSubdirs) {
- existingDoneSubdirs.remove(serialDir.getPath());
- }
+ existingDoneSubdirs.remove(serialDir.getPath());
} else {
break; // Don't scan any more directories.
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java Tue Apr 24 19:05:09 2012
@@ -28,7 +28,12 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
/**
- * Provides an API to query jobs that have finished.
+ * Provides an API to query jobs that have finished.
+ *
+ * For those implementing this API be aware that there is no feedback when
+ * files are removed from HDFS. You may rely on HistoryFileManager to help
+ * you know when that has happened if you have not made a complete backup of
+ * the data stored on HDFS.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@@ -71,10 +76,4 @@ public interface HistoryStorage {
* @return the job, or null if it is not found.
*/
Job getFullJob(JobId jobId);
-
- /**
- * Informs the Storage that a job has been removed from HDFS
- * @param jobId the ID of the job that was removed.
- */
- void jobRemovedFromHDFS(JobId jobId);
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue Apr 24 19:05:09 2012
@@ -21,10 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -37,7 +34,7 @@ import org.apache.hadoop.mapreduce.TypeC
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.ReflectionUtils;
@@ -66,15 +63,9 @@ public class JobHistory extends Abstract
// Time interval for the move thread.
private long moveThreadInterval;
- // Number of move threads.
- private int numMoveThreads;
-
private Configuration conf;
- private Thread moveIntermediateToDoneThread = null;
- private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
-
- private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
+ private ScheduledThreadPoolExecutor scheduledExecutor = null;
private HistoryStorage storage = null;
private HistoryFileManager hsManager = null;
@@ -91,8 +82,6 @@ public class JobHistory extends Abstract
moveThreadInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
- numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
- JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
hsManager = new HistoryFileManager();
hsManager.init(conf);
@@ -120,27 +109,22 @@ public class JobHistory extends Abstract
((Service) storage).start();
}
- // Start moveIntermediatToDoneThread
- moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
- moveThreadInterval, numMoveThreads);
- moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
- moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
- moveIntermediateToDoneThread.start();
+ scheduledExecutor = new ScheduledThreadPoolExecutor(2,
+ new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
+ .build());
+
+ scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
+ moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
// Start historyCleaner
boolean startCleanerService = conf.getBoolean(
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
if (startCleanerService) {
- long maxAgeOfHistoryFiles = conf.getLong(
- JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
- JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
- cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
long runInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
- cleanerScheduledExecutor
- .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
+ scheduledExecutor
+ .scheduleAtFixedRate(new HistoryCleaner(),
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
}
super.start();
@@ -149,24 +133,12 @@ public class JobHistory extends Abstract
@Override
public void stop() {
LOG.info("Stopping JobHistory");
- if (moveIntermediateToDoneThread != null) {
- LOG.info("Stopping move thread");
- moveIntermediateToDoneRunnable.stop();
- moveIntermediateToDoneThread.interrupt();
- try {
- LOG.info("Joining on move thread");
- moveIntermediateToDoneThread.join();
- } catch (InterruptedException e) {
- LOG.info("Interrupted while stopping move thread");
- }
- }
-
- if (cleanerScheduledExecutor != null) {
- LOG.info("Stopping History Cleaner");
- cleanerScheduledExecutor.shutdown();
+ if (scheduledExecutor != null) {
+ LOG.info("Stopping History Cleaner/Move To Done");
+ scheduledExecutor.shutdown();
boolean interrupted = false;
long currentTime = System.currentTimeMillis();
- while (!cleanerScheduledExecutor.isShutdown()
+ while (!scheduledExecutor.isShutdown()
&& System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
try {
Thread.sleep(20);
@@ -174,8 +146,10 @@ public class JobHistory extends Abstract
interrupted = true;
}
}
- if (!cleanerScheduledExecutor.isShutdown()) {
- LOG.warn("HistoryCleanerService shutdown may not have succeeded");
+ if (!scheduledExecutor.isShutdown()) {
+ LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
+ "succeeded, Forcing a shutdown");
+ scheduledExecutor.shutdownNow();
}
}
if (storage instanceof Service) {
@@ -195,68 +169,34 @@ public class JobHistory extends Abstract
}
private class MoveIntermediateToDoneRunnable implements Runnable {
-
- private long sleepTime;
- private ThreadPoolExecutor moveToDoneExecutor = null;
- private boolean running = false;
-
- public synchronized void stop() {
- running = false;
- notify();
- }
-
- MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
- this.sleepTime = sleepTime;
- ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
- "MoveIntermediateToDone Thread #%d").build();
- moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
- TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
- running = true;
- }
-
@Override
public void run() {
- Thread.currentThread().setName("IntermediateHistoryScanner");
try {
- while (true) {
- LOG.info("Starting scan to move intermediate done files");
- for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
- moveToDoneExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- hsManager.moveToDone(metaInfo);
- } catch (IOException e) {
- LOG.info(
- "Failed to process metaInfo for job: "
- + metaInfo.getJobId(), e);
- }
- }
- });
- }
- synchronized (this) {
- try {
- this.wait(sleepTime);
- } catch (InterruptedException e) {
- LOG.info("IntermediateHistoryScannerThread interrupted");
- }
- if (!running) {
- break;
- }
- }
- }
+ LOG.info("Starting scan to move intermediate done files");
+ hsManager.scanIntermediateDirectory();
} catch (IOException e) {
- LOG.warn("Unable to get a list of intermediate files to be moved");
- // TODO Shut down the entire process!!!!
+ LOG.error("Error while scanning intermediate done dir ", e);
}
}
}
+
+ private class HistoryCleaner implements Runnable {
+ public void run() {
+ LOG.info("History Cleaner started");
+ try {
+ hsManager.clean();
+ } catch (IOException e) {
+ LOG.warn("Error trying to clean up ", e);
+ }
+ LOG.info("History Cleaner complete");
+ }
+ }
/**
* Helper method for test cases.
*/
- MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
- return hsManager.getMetaInfo(jobId);
+ HistoryFileInfo getJobFileInfo(JobId jobId) throws IOException {
+ return hsManager.getFileInfo(jobId);
}
@Override
@@ -313,25 +253,6 @@ public class JobHistory extends Abstract
fBegin, fEnd, jobState);
}
- public class HistoryCleaner implements Runnable {
- long maxAgeMillis;
-
- public HistoryCleaner(long maxAge) {
- this.maxAgeMillis = maxAge;
- }
-
- public void run() {
- LOG.info("History Cleaner started");
- long cutoff = System.currentTimeMillis() - maxAgeMillis;
- try {
- hsManager.clean(cutoff, storage);
- } catch (IOException e) {
- LOG.warn("Error trying to clean up ", e);
- }
- LOG.info("History Cleaner complete");
- }
- }
-
// TODO AppContext - Not Required
private ApplicationAttemptId appAttemptID;
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Tue Apr 24 19:05:09 2012
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
@@ -166,6 +167,11 @@ public class PartialJob implements org.a
public Path getConfFile() {
throw new IllegalStateException("Not implemented yet");
}
+
+ @Override
+ public Configuration loadConfFile() {
+ throw new IllegalStateException("Not implemented yet");
+ }
@Override
public Map<JobACL, AccessControlList> getJobACLs() {
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java Tue Apr 24 19:05:09 2012
@@ -89,7 +89,8 @@ public class HsTaskPage extends HsView {
headRow.
th(".id", "Attempt").
th(".state", "State").
- th(".node", "node").
+ th(".node", "Node").
+ th(".logs", "Logs").
th(".tsh", "Start Time");
if(type == TaskType.REDUCE) {
@@ -144,10 +145,11 @@ public class HsTaskPage extends HsView {
_(taid)._().td(ta.getState().toString()).td().a(".nodelink",
"http://"+ nodeHttpAddr,
nodeRackName + "/" + nodeHttpAddr);
- td._(" ").a(".logslink",
- url("logs", nodeIdString, containerIdString, taid, app.getJob()
- .getUserName()), "logs");
td._();
+ row.td().
+ a(".logslink",
+ url("logs", nodeIdString, containerIdString, taid, app.getJob()
+ .getUserName()), "logs")._();
row.td().
br().$title(String.valueOf(attemptStartTime))._().
@@ -196,6 +198,8 @@ public class HsTaskPage extends HsView {
th().input("search_init").$type(InputType.text).
$name("attempt_node").$value("Node")._()._().
th().input("search_init").$type(InputType.text).
+ $name("attempt_node").$value("Logs")._()._().
+ th().input("search_init").$type(InputType.text).
$name("attempt_start_time").$value("Start Time")._()._();
if(type == TaskType.REDUCE) {
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java Tue Apr 24 19:05:09 2012
@@ -65,7 +65,6 @@ import com.google.inject.Inject;
public class HsWebServices {
private final HistoryContext ctx;
private WebApp webapp;
- private final Configuration conf;
@Context
UriInfo uriInfo;
@@ -74,7 +73,6 @@ public class HsWebServices {
public HsWebServices(final HistoryContext ctx, final Configuration conf,
final WebApp webapp) {
this.ctx = ctx;
- this.conf = conf;
this.webapp = webapp;
}
@@ -222,7 +220,7 @@ public class HsWebServices {
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
ConfInfo info;
try {
- info = new ConfInfo(job, this.conf);
+ info = new ConfInfo(job);
} catch (IOException e) {
throw new NotFoundException("unable to load configuration for job: "
+ jid);
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Tue Apr 24 19:05:09 2012
@@ -22,12 +22,15 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import static org.mockito.Mockito.*;
+
@RunWith(value = Parameterized.class)
public class TestJobHistoryEntities {
@@ -61,10 +64,12 @@ public class TestJobHistoryEntities {
/* Verify some expected values based on the history file */
@Test
public void testCompletedJob() throws Exception {
+ HistoryFileInfo info = mock(HistoryFileInfo.class);
+ when(info.getConfFile()).thenReturn(fullConfPath);
//Re-initialize to verify the delayed load.
completedJob =
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
- fullConfPath, jobAclsManager);
+ info, jobAclsManager);
//Verify tasks loaded based on loadTask parameter.
assertEquals(loadTasks, completedJob.tasksLoaded.get());
assertEquals(1, completedJob.getAMInfos().size());
@@ -84,9 +89,11 @@ public class TestJobHistoryEntities {
@Test
public void testCompletedTask() throws Exception {
+ HistoryFileInfo info = mock(HistoryFileInfo.class);
+ when(info.getConfFile()).thenReturn(fullConfPath);
completedJob =
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
- fullConfPath, jobAclsManager);
+ info, jobAclsManager);
TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
@@ -111,9 +118,11 @@ public class TestJobHistoryEntities {
@Test
public void testCompletedTaskAttempt() throws Exception {
+ HistoryFileInfo info = mock(HistoryFileInfo.class);
+ when(info.getConfFile()).thenReturn(fullConfPath);
completedJob =
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
- fullConfPath, jobAclsManager);
+ info, jobAclsManager);
TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue Apr 24 19:05:09 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -84,12 +85,22 @@ public class TestJobHistoryParsing {
@Test
public void testHistoryParsing() throws Exception {
- checkHistoryParsing(2, 1, 2);
+ LOG.info("STARTING testHistoryParsing()");
+ try {
+ checkHistoryParsing(2, 1, 2);
+ } finally {
+ LOG.info("FINISHED testHistoryParsing()");
+ }
}
@Test
public void testHistoryParsingWithParseErrors() throws Exception {
- checkHistoryParsing(3, 0, 2);
+ LOG.info("STARTING testHistoryParsingWithParseErrors()");
+ try {
+ checkHistoryParsing(3, 0, 2);
+ } finally {
+ LOG.info("FINISHED testHistoryParsingWithParseErrors()");
+ }
}
private static String getJobSummary(FileContext fc, Path path) throws IOException {
@@ -124,61 +135,112 @@ public class TestJobHistoryParsing {
String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
- JobHistory jobHistory = new JobHistory();
- jobHistory.init(conf);
-
- JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
- .getJobIndexInfo();
- String jobhistoryFileName = FileNameIndexUtils
- .getDoneFileName(jobIndexInfo);
-
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
- FSDataInputStream in = null;
- LOG.info("JobHistoryFile is: " + historyFilePath);
+
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
- in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
- LOG.info("Can not open history file: " + historyFilePath, ioe);
- throw (new Exception("Can not open History File"));
+ LOG.info("Can not get FileContext", ioe);
+ throw (new Exception("Can not get File Context"));
}
-
- JobHistoryParser parser = new JobHistoryParser(in);
- final EventReader realReader = new EventReader(in);
- EventReader reader = Mockito.mock(EventReader.class);
+
if (numMaps == numSuccessfulMaps) {
- reader = realReader;
- } else {
- final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
- Mockito.when(reader.getNextEvent()).thenAnswer(
- new Answer<HistoryEvent>() {
- public HistoryEvent answer(InvocationOnMock invocation)
- throws IOException {
- HistoryEvent event = realReader.getNextEvent();
- if (event instanceof TaskFinishedEvent) {
- numFinishedEvents.incrementAndGet();
- }
-
- if (numFinishedEvents.get() <= numSuccessfulMaps) {
- return event;
- } else {
- throw new IOException("test");
+ String summaryFileName = JobHistoryUtils
+ .getIntermediateSummaryFileName(jobId);
+ Path summaryFile = new Path(jobhistoryDir, summaryFileName);
+ String jobSummaryString = getJobSummary(fc, summaryFile);
+ Assert.assertNotNull(jobSummaryString);
+ Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+ Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
+
+ Map<String, String> jobSummaryElements = new HashMap<String, String>();
+ StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
+ while (strToken.hasMoreTokens()) {
+ String keypair = strToken.nextToken();
+ jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
+ }
+
+ Assert.assertEquals("JobId does not match", jobId.toString(),
+ jobSummaryElements.get("jobId"));
+ Assert.assertEquals("JobName does not match", "test",
+ jobSummaryElements.get("jobName"));
+ Assert.assertTrue("submitTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
+ Assert.assertTrue("launchTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
+ Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+ Assert
+ .assertTrue(
+ "firstReduceTaskLaunchTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+ Assert.assertTrue("finishTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
+ Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
+ Integer.parseInt(jobSummaryElements.get("numMaps")));
+ Assert.assertEquals("Mismatch in num reduce slots", numReduces,
+ Integer.parseInt(jobSummaryElements.get("numReduces")));
+ Assert.assertEquals("User does not match", System.getProperty("user.name"),
+ jobSummaryElements.get("user"));
+ Assert.assertEquals("Queue does not match", "default",
+ jobSummaryElements.get("queue"));
+ Assert.assertEquals("Status does not match", "SUCCEEDED",
+ jobSummaryElements.get("status"));
+ }
+
+ JobHistory jobHistory = new JobHistory();
+ jobHistory.init(conf);
+ HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+ JobInfo jobInfo;
+ long numFinishedMaps;
+
+ synchronized(fileInfo) {
+ Path historyFilePath = fileInfo.getHistoryFile();
+ FSDataInputStream in = null;
+ LOG.info("JobHistoryFile is: " + historyFilePath);
+ try {
+ in = fc.open(fc.makeQualified(historyFilePath));
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
+ throw (new Exception("Can not open History File"));
+ }
+
+ JobHistoryParser parser = new JobHistoryParser(in);
+ final EventReader realReader = new EventReader(in);
+ EventReader reader = Mockito.mock(EventReader.class);
+ if (numMaps == numSuccessfulMaps) {
+ reader = realReader;
+ } else {
+ final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
+ Mockito.when(reader.getNextEvent()).thenAnswer(
+ new Answer<HistoryEvent>() {
+ public HistoryEvent answer(InvocationOnMock invocation)
+ throws IOException {
+ HistoryEvent event = realReader.getNextEvent();
+ if (event instanceof TaskFinishedEvent) {
+ numFinishedEvents.incrementAndGet();
+ }
+
+ if (numFinishedEvents.get() <= numSuccessfulMaps) {
+ return event;
+ } else {
+ throw new IOException("test");
+ }
}
}
- }
);
- }
-
- JobInfo jobInfo = parser.parse(reader);
-
- long numFinishedMaps =
+ }
+
+ jobInfo = parser.parse(reader);
+
+ numFinishedMaps =
computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
-
- if (numFinishedMaps != numMaps) {
- Exception parseException = parser.getParseException();
- Assert.assertNotNull("Didn't get expected parse exception",
- parseException);
+
+ if (numFinishedMaps != numMaps) {
+ Exception parseException = parser.getParseException();
+ Assert.assertNotNull("Didn't get expected parse exception",
+ parseException);
+ }
}
Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
@@ -246,52 +308,6 @@ public class TestJobHistoryParsing {
}
}
}
-
- if (numMaps == numSuccessfulMaps) {
-
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobId);
- Path summaryFile = new Path(jobhistoryDir, summaryFileName);
- String jobSummaryString = getJobSummary(fc, summaryFile);
- Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
- Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
- Assert.assertNotNull(jobSummaryString);
-
- Map<String, String> jobSummaryElements = new HashMap<String, String>();
- StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
- while (strToken.hasMoreTokens()) {
- String keypair = strToken.nextToken();
- jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
-
- }
-
- Assert.assertEquals("JobId does not match", jobId.toString(),
- jobSummaryElements.get("jobId"));
- Assert.assertEquals("JobName does not match", "test",
- jobSummaryElements.get("jobName"));
- Assert.assertTrue("submitTime should not be 0",
- Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
- Assert.assertTrue("launchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
- Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
- Assert
- .assertTrue(
- "firstReduceTaskLaunchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
- Assert.assertTrue("finishTime should not be 0",
- Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
- Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
- Integer.parseInt(jobSummaryElements.get("numMaps")));
- Assert.assertEquals("Mismatch in num reduce slots", numReduces,
- Integer.parseInt(jobSummaryElements.get("numReduces")));
- Assert.assertEquals("User does not match", System.getProperty("user.name"),
- jobSummaryElements.get("user"));
- Assert.assertEquals("Queue does not match", "default",
- jobSummaryElements.get("queue"));
- Assert.assertEquals("Status does not match", "SUCCEEDED",
- jobSummaryElements.get("status"));
- }
}
// Computes finished maps similar to RecoveryService...
@@ -314,6 +330,8 @@ public class TestJobHistoryParsing {
@Test
public void testHistoryParsingForFailedAttempts() throws Exception {
+ LOG.info("STARTING testHistoryParsingForFailedAttempts");
+ try {
Configuration conf = new Configuration();
conf
.setClass(
@@ -335,7 +353,7 @@ public class TestJobHistoryParsing {
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
- JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
+ JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
.getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo);
@@ -372,6 +390,9 @@ public class TestJobHistoryParsing {
}
}
Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
+ } finally {
+ LOG.info("FINISHED testHistoryParsingForFailedAttempts");
+ }
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml Tue Apr 24 19:05:09 2012
@@ -292,7 +292,7 @@
<property><!--Loaded from job.xml--><name>yarn.resourcemanager.scheduler.address</name><value>0.0.0.0:8030</value></property>
<property><!--Loaded from job.xml--><name>fs.trash.checkpoint.interval</name><value>0</value></property>
<property><!--Loaded from job.xml--><name>s3native.stream-buffer-size</name><value>4096</value></property>
-<property><!--Loaded from job.xml--><name>yarn.scheduler.fifo.minimum-allocation-mb</name><value>1024</value></property>
+<property><!--Loaded from job.xml--><name>yarn.scheduler.minimum-allocation-mb</name><value>128</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.reduce.shuffle.read.timeout</name><value>180000</value></property>
<property><!--Loaded from job.xml--><name>yarn.app.mapreduce.am.command-opts</name><value>-Xmx500m</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.admin.user.env</name><value>LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native</value></property>
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Tue Apr 24 19:05:09 2012
@@ -282,7 +282,7 @@ public class ClientServiceDelegate {
}
private synchronized Object invoke(String method, Class argClass,
- Object args) throws YarnRemoteException {
+ Object args) throws IOException {
Method methodOb = null;
try {
methodOb = MRClientProtocol.class.getMethod(method, argClass);
@@ -291,7 +291,11 @@ public class ClientServiceDelegate {
} catch (NoSuchMethodException e) {
throw new YarnException("Method name mismatch", e);
}
- while (true) {
+ int maxRetries = this.conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ IOException lastException = null;
+ while (maxRetries > 0) {
try {
return methodOb.invoke(getProxy(), args);
} catch (YarnRemoteException yre) {
@@ -308,13 +312,21 @@ public class ClientServiceDelegate {
" retrying..", e.getTargetException());
// Force reconnection by setting the proxy to null.
realProxy = null;
+ // HS/AMS shut down
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
+
} catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId
+ " Will retry..", e);
// Force reconnection by setting the proxy to null.
realProxy = null;
+ // RM shutdown
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
}
}
+ throw lastException;
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
@@ -364,7 +376,7 @@ public class ClientServiceDelegate {
return result;
}
- public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+ public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
@@ -390,7 +402,7 @@ public class ClientServiceDelegate {
}
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
- throws YarnRemoteException, YarnRemoteException {
+ throws IOException{
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetTaskReportsRequest request =
@@ -407,7 +419,7 @@ public class ClientServiceDelegate {
}
public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
- throws YarnRemoteException {
+ throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
if (fail) {
@@ -423,7 +435,7 @@ public class ClientServiceDelegate {
}
public boolean killJob(JobID oldJobID)
- throws YarnRemoteException {
+ throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID);
KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Tue Apr 24 19:05:09 2012
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
@@ -88,12 +87,10 @@ public class ResourceMgrDelegate {
public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
- InetSocketAddress rmAddress =
- NetUtils.createSocketAddr(this.conf.get(
+ InetSocketAddress rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS),
- YarnConfiguration.DEFAULT_RM_PORT,
- YarnConfiguration.RM_ADDRESS);
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
this.rmAddress = rmAddress.toString();
LOG.debug("Connecting to ResourceManager at " + rmAddress);
applicationsManager =
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java Tue Apr 24 19:05:09 2012
@@ -128,7 +128,7 @@ public class DistributedFSCheck extends
if (rootStatus.isFile()) {
nrFiles++;
// For a regular file generate <fName,offset> pairs
- long blockSize = fs.getDefaultBlockSize();
+ long blockSize = fs.getDefaultBlockSize(rootFile);
long fileLength = rootStatus.getLen();
for(long offset = 0; offset < fileLength; offset += blockSize)
writer.append(new Text(rootFile.toString()), new LongWritable(offset));
@@ -160,15 +160,16 @@ public class DistributedFSCheck extends
) throws IOException {
// open file
FSDataInputStream in = null;
+ Path p = new Path(name);
try {
- in = fs.open(new Path(name));
+ in = fs.open(p);
} catch(IOException e) {
return name + "@(missing)";
}
in.seek(offset);
long actualSize = 0;
try {
- long blockSize = fs.getDefaultBlockSize();
+ long blockSize = fs.getDefaultBlockSize(p);
reporter.setStatus("reading " + name + "@" +
offset + "/" + blockSize);
for( int curSize = bufferSize;
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java Tue Apr 24 19:05:09 2012
@@ -83,7 +83,9 @@ public class TestSocketFactory {
JobConf jobConf = new JobConf();
FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
- JobConf jconf = new JobConf(cconf);
+ JobConf jconf = new JobConf(miniMRYarnCluster.getConfig());
+ jconf.set("hadoop.rpc.socket.factory.class.default",
+ "org.apache.hadoop.ipc.DummySocketFactory");
jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
String rmAddress = jconf.get("yarn.resourcemanager.address");
String[] split = rmAddress.split(":");
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Tue Apr 24 19:05:09 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -122,8 +123,7 @@ public class TestClientServiceDelegate {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
- new RuntimeException("1")).thenThrow(new RuntimeException("2"))
- .thenThrow(new RuntimeException("3"))
+ new RuntimeException("1")).thenThrow(new RuntimeException("2"))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
@@ -135,7 +135,7 @@ public class TestClientServiceDelegate {
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
- verify(historyServerProxy, times(4)).getJobReport(
+ verify(historyServerProxy, times(3)).getJobReport(
any(GetJobReportRequest.class));
}
@@ -312,6 +312,74 @@ public class TestClientServiceDelegate {
any(String.class));
}
+ @Test
+ public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
+ Configuration conf = new YarnConfiguration();
+ testRMDownForJobStatusBeforeGetAMReport(conf,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ }
+
+ @Test
+ public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
+ throws IOException {
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
+ testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES));
+ }
+
+ @Test
+ public void testRMDownRestoreForJobStatusBeforeGetAMReport()
+ throws IOException {
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
+
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+ !isAMReachableFromClient);
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
+ .thenReturn(getJobReportResponse());
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced1"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced2"))).thenReturn(getFinishedApplicationReport());
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+ conf, rmDelegate, oldJobId, historyServerProxy);
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+ verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
+ Assert.assertNotNull(jobStatus);
+ }
+
+ private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
+ int noOfRetries) throws YarnRemoteException {
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+ !isAMReachableFromClient);
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced1"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced2"))).thenThrow(
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
+ "Connection refuced3")));
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+ conf, rmDelegate, oldJobId, historyServerProxy);
+ try {
+ clientServiceDelegate.getJobStatus(oldJobId);
+ Assert.fail("It should throw exception after retries");
+ } catch (IOException e) {
+ System.out.println("fail to get job status,and e=" + e.toString());
+ }
+ verify(rmDelegate, times(noOfRetries)).getApplicationReport(
+ any(ApplicationId.class));
+ }
+
private GetJobReportRequest getJobReportRequest() {
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
request.setJobId(jobId);
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java Tue Apr 24 19:05:09 2012
@@ -173,7 +173,7 @@ public class TestMiniMRClasspath {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
- JobConf jobConf = new JobConf();
+ JobConf jobConf = mr.createJobConf();
String result;
result = launchWordCount(fileSys.getUri(), jobConf,
"The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1);
@@ -205,7 +205,7 @@ public class TestMiniMRClasspath {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
- JobConf jobConf = new JobConf();
+ JobConf jobConf = mr.createJobConf();
String result;
result = launchExternal(fileSys.getUri(), jobConf,
Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Tue Apr 24 19:05:09 2012
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -126,6 +128,10 @@ public class MiniMRYarnCluster extends M
@Override
public synchronized void start() {
try {
+ getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+ MiniYARNCluster.getHostname() + ":0");
+ getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+ MiniYARNCluster.getHostname() + ":0");
historyServer = new JobHistoryServer();
historyServer.init(getConfig());
new Thread() {
@@ -145,6 +151,20 @@ public class MiniMRYarnCluster extends M
} catch (Throwable t) {
throw new YarnException(t);
}
+ //need to do this because historyServer.init creates a new Configuration
+ getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+ historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+ getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+ historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
+
+ LOG.info("MiniMRYARN ResourceManager address: " +
+ getConfig().get(YarnConfiguration.RM_ADDRESS));
+ LOG.info("MiniMRYARN ResourceManager web address: " +
+ getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+ LOG.info("MiniMRYARN HistoryServer address: " +
+ getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+ LOG.info("MiniMRYARN HistoryServer web address: " +
+ getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
}
@Override