You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/04/24 21:05:20 UTC

svn commit: r1329947 [2/4] - in /hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ hadoop-mapreduce-...

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Tue Apr 24 19:05:09 2012
@@ -25,12 +25,17 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,6 +62,8 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * This class provides a way to interact with history files in a thread safe
  * manor.
@@ -67,33 +74,251 @@ public class HistoryFileManager extends 
   private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
   private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
 
+  private static enum HistoryInfoState {
+    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
+  };
+
   private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
       .doneSubdirsBeforeSerialTail();
 
-  public static class MetaInfo {
+  /**
+   * Maps between a serial number (generated based on jobId) and the timestamp
+   * component(s) to which it belongs. Facilitates jobId based searches. If a
+   * jobId is not found in this list - it will not be found.
+   */
+  private static class SerialNumberIndex {
+    private SortedMap<String, Set<String>> cache;
+    private int maxSize;
+
+    public SerialNumberIndex(int maxSize) {
+      this.cache = new TreeMap<String, Set<String>>();
+      this.maxSize = maxSize;
+    }
+
+    public synchronized void add(String serialPart, String timestampPart) {
+      if (!cache.containsKey(serialPart)) {
+        cache.put(serialPart, new HashSet<String>());
+        if (cache.size() > maxSize) {
+          String key = cache.firstKey();
+          LOG.error("Dropping " + key
+              + " from the SerialNumberIndex. We will no "
+              + "longer be able to see jobs that are in that serial index for "
+              + cache.get(key));
+          cache.remove(key);
+        }
+      }
+      Set<String> datePartSet = cache.get(serialPart);
+      datePartSet.add(timestampPart);
+    }
+
+    public synchronized void remove(String serialPart, String timeStampPart) {
+      if (cache.containsKey(serialPart)) {
+        Set<String> set = cache.get(serialPart);
+        set.remove(timeStampPart);
+        if (set.isEmpty()) {
+          cache.remove(serialPart);
+        }
+      }
+    }
+
+    public synchronized Set<String> get(String serialPart) {
+      Set<String> found = cache.get(serialPart);
+      if (found != null) {
+        return new HashSet<String>(found);
+      }
+      return null;
+    }
+  }
+
+  private static class JobListCache {
+    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
+    private int maxSize;
+    private long maxAge;
+
+    public JobListCache(int maxSize, long maxAge) {
+      this.maxSize = maxSize;
+      this.maxAge = maxAge;
+      this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
+    }
+
+    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
+      JobId jobId = fileInfo.getJobIndexInfo().getJobId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding " + jobId + " to job list cache with "
+            + fileInfo.getJobIndexInfo());
+      }
+      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
+      if (cache.size() > maxSize) {
+        //There is a race here, where more then one thread could be trying to
+        // remove entries.  This could result in too many entries being removed
+        // from the cache.  This is considered OK as the size of the cache
+        // should be rather large, and we would rather have performance over
+        // keeping the cache size exactly at the maximum.
+        Iterator<JobId> keys = cache.navigableKeySet().iterator();
+        long cutoff = System.currentTimeMillis() - maxAge;
+        while(cache.size() > maxSize && keys.hasNext()) {
+          JobId key = keys.next();
+          HistoryFileInfo firstValue = cache.get(key);
+          if(firstValue != null) {
+            synchronized(firstValue) {
+              if (firstValue.isMovePending()) {
+                if(firstValue.didMoveFail() && 
+                    firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
+                  cache.remove(key);
+                  //Now lets try to delete it
+                  try {
+                    firstValue.delete();
+                  } catch (IOException e) {
+                    LOG.error("Error while trying to delete history files" +
+                    		" that could not be moved to done.", e);
+                  }
+                } else {
+                  LOG.warn("Waiting to remove " + key
+                      + " from JobListCache because it is not in done yet.");
+                }
+              } else {
+                cache.remove(key);
+              }
+            }
+          }
+        }
+      }
+      return old;
+    }
+
+    public void delete(HistoryFileInfo fileInfo) {
+      cache.remove(fileInfo.getJobId());
+    }
+
+    public Collection<HistoryFileInfo> values() {
+      return new ArrayList<HistoryFileInfo>(cache.values());
+    }
+
+    public HistoryFileInfo get(JobId jobId) {
+      return cache.get(jobId);
+    }
+  }
+
+  public class HistoryFileInfo {
     private Path historyFile;
     private Path confFile;
     private Path summaryFile;
     private JobIndexInfo jobIndexInfo;
+    private HistoryInfoState state;
 
-    public MetaInfo(Path historyFile, Path confFile, Path summaryFile,
-        JobIndexInfo jobIndexInfo) {
+    private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
+        JobIndexInfo jobIndexInfo, boolean isInDone) {
       this.historyFile = historyFile;
       this.confFile = confFile;
       this.summaryFile = summaryFile;
       this.jobIndexInfo = jobIndexInfo;
+      state = isInDone ? HistoryInfoState.IN_DONE
+          : HistoryInfoState.IN_INTERMEDIATE;
     }
 
-    private Path getHistoryFile() {
-      return historyFile;
+    private synchronized boolean isMovePending() {
+      return state == HistoryInfoState.IN_INTERMEDIATE
+          || state == HistoryInfoState.MOVE_FAILED;
     }
 
-    private Path getConfFile() {
-      return confFile;
+    private synchronized boolean didMoveFail() {
+      return state == HistoryInfoState.MOVE_FAILED;
+    }
+    
+    /**
+     * @return true if the files backed by this were deleted.
+     */
+    public synchronized boolean isDeleted() {
+      return state == HistoryInfoState.DELETED;
     }
 
-    private Path getSummaryFile() {
-      return summaryFile;
+    private synchronized void moveToDone() throws IOException {
+      if (!isMovePending()) {
+        // It was either deleted or is already in done. Either way do nothing
+        return;
+      }
+      try {
+        long completeTime = jobIndexInfo.getFinishTime();
+        if (completeTime == 0) {
+          completeTime = System.currentTimeMillis();
+        }
+        JobId jobId = jobIndexInfo.getJobId();
+
+        List<Path> paths = new ArrayList<Path>(2);
+        if (historyFile == null) {
+          LOG.info("No file for job-history with " + jobId + " found in cache!");
+        } else {
+          paths.add(historyFile);
+        }
+
+        if (confFile == null) {
+          LOG.info("No file for jobConf with " + jobId + " found in cache!");
+        } else {
+          paths.add(confFile);
+        }
+
+        if (summaryFile == null) {
+          LOG.info("No summary file for job: " + jobId);
+        } else {
+          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
+              summaryFile);
+          SUMMARY_LOG.info(jobSummaryString);
+          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
+          intermediateDoneDirFc.delete(summaryFile, false);
+          summaryFile = null;
+        }
+
+        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
+        addDirectoryToSerialNumberIndex(targetDir);
+        makeDoneSubdir(targetDir);
+        if (historyFile != null) {
+          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
+              .getName()));
+          if (!toPath.equals(historyFile)) {
+            moveToDoneNow(historyFile, toPath);
+            historyFile = toPath;
+          }
+        }
+        if (confFile != null) {
+          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
+              .getName()));
+          if (!toPath.equals(confFile)) {
+            moveToDoneNow(confFile, toPath);
+            confFile = toPath;
+          }
+        }
+        state = HistoryInfoState.IN_DONE;
+      } catch (Throwable t) {
+        LOG.error("Error while trying to move a job to done", t);
+        this.state = HistoryInfoState.MOVE_FAILED;
+      }
+    }
+
+    /**
+     * Parse a job from the JobHistoryFile, if the underlying file is not going
+     * to be deleted.
+     * 
+     * @return the Job or null if the underlying file was deleted.
+     * @throws IOException
+     *           if there is an error trying to read the file.
+     */
+    public synchronized Job loadJob() throws IOException {
+      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
+          false, jobIndexInfo.getUser(), this, aclsMgr);
+    }
+
+    /**
+     * Return the history file.  This should only be used for testing.
+     * @return the history file.
+     */
+    synchronized Path getHistoryFile() {
+      return historyFile;
+    }
+    
+    private synchronized void delete() throws IOException {
+      state = HistoryInfoState.DELETED;
+      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
+      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
     }
 
     public JobIndexInfo getJobIndexInfo() {
@@ -104,57 +329,35 @@ public class HistoryFileManager extends 
       return jobIndexInfo.getJobId();
     }
 
-    private void setHistoryFile(Path historyFile) {
-      this.historyFile = historyFile;
-    }
-
-    private void setConfFile(Path confFile) {
-      this.confFile = confFile;
+    public synchronized Path getConfFile() {
+      return confFile;
     }
-
-    private void setSummaryFile(Path summaryFile) {
-      this.summaryFile = summaryFile;
+    
+    public synchronized Configuration loadConfFile() throws IOException {
+      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
+      Configuration jobConf = new Configuration(false);
+      jobConf.addResource(fc.open(confFile));
+      return jobConf;
     }
   }
 
-  /**
-   * Maps between a serial number (generated based on jobId) and the timestamp
-   * component(s) to which it belongs. Facilitates jobId based searches. If a
-   * jobId is not found in this list - it will not be found.
-   */
-  private final SortedMap<String, Set<String>> idToDateString = 
-    new TreeMap<String, Set<String>>();
-  // The number of entries in idToDateString
-  private int dateStringCacheSize;
-
-  // Maintains minimal details for recent jobs (parsed from history file name).
-  // Sorted on Job Completion Time.
-  private final SortedMap<JobId, MetaInfo> jobListCache = 
-    new ConcurrentSkipListMap<JobId, MetaInfo>();
-  // The number of jobs to maintain in the job list cache.
-  private int jobListCacheSize;
-
-  // Re-use existing MetaInfo objects if they exist for the specific JobId.
-  // (synchronization on MetaInfo)
-  // Check for existence of the object when using iterators.
-  private final SortedMap<JobId, MetaInfo> intermediateListCache = 
-    new ConcurrentSkipListMap<JobId, MetaInfo>();
+  private SerialNumberIndex serialNumberIndex = null;
+  private JobListCache jobListCache = null;
 
   // Maintains a list of known done subdirectories.
-  private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+  private final Set<Path> existingDoneSubdirs = Collections
+      .synchronizedSet(new HashSet<Path>());
 
   /**
    * Maintains a mapping between intermediate user directories and the last
    * known modification time.
    */
-  private Map<String, Long> userDirModificationTimeMap = 
-    new HashMap<String, Long>();
+  private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
 
   private JobACLsManager aclsMgr;
 
   private Configuration conf;
 
-  // TODO Remove me!!!!
   private boolean debugMode;
   private String serialNumberFormat;
 
@@ -165,6 +368,9 @@ public class HistoryFileManager extends 
   private FileContext intermediateDoneDirFc; // Intermediate Done Dir
                                              // FileContext
 
+  private ThreadPoolExecutor moveToDoneExecutor = null;
+  private long maxHistoryAge = 0;
+  
   public HistoryFileManager() {
     super(HistoryFileManager.class.getName());
   }
@@ -211,12 +417,25 @@ public class HistoryFileManager extends 
 
     this.aclsMgr = new JobACLsManager(conf);
 
-    jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
-        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE);
+    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
+    
+    jobListCache = new JobListCache(conf.getInt(
+        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
+        maxHistoryAge);
 
-    dateStringCacheSize = conf.getInt(
+    serialNumberIndex = new SerialNumberIndex(conf.getInt(
         JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
-        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE);
+        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
+
+    int numMoveThreads = conf.getInt(
+        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
+        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        "MoveIntermediateToDone Thread #%d").build();
+    moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
+        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
 
     super.init(conf);
   }
@@ -249,6 +468,7 @@ public class HistoryFileManager extends 
   void initExisting() throws IOException {
     LOG.info("Initializing Existing Jobs...");
     List<FileStatus> timestampedDirList = findTimestampedDirectories();
+    // Sort first just so insertion is in a consistent order
     Collections.sort(timestampedDirList);
     for (FileStatus fs : timestampedDirList) {
       // TODO Could verify the correct format for these directories.
@@ -271,16 +491,7 @@ public class HistoryFileManager extends 
           + serialDirPath.toString() + ". Continuing with next");
       return;
     }
-    synchronized (idToDateString) {
-      // TODO make this thread safe without the synchronize
-      if (idToDateString.containsKey(serialPart)) {
-        Set<String> set = idToDateString.get(serialPart);
-        set.remove(timeStampPart);
-        if (set.isEmpty()) {
-          idToDateString.remove(serialPart);
-        }
-      }
-    }
+    serialNumberIndex.remove(serialPart, timeStampPart);
   }
 
   private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
@@ -299,21 +510,7 @@ public class HistoryFileManager extends 
       LOG.warn("Could not find serial portion from path: "
           + serialDirPath.toString() + ". Continuing with next");
     }
-    addToSerialNumberIndex(serialPart, timestampPart);
-  }
-
-  private void addToSerialNumberIndex(String serialPart, String timestampPart) {
-    synchronized (idToDateString) {
-      // TODO make this thread safe without the synchronize
-      if (!idToDateString.containsKey(serialPart)) {
-        idToDateString.put(serialPart, new HashSet<String>());
-        if (idToDateString.size() > dateStringCacheSize) {
-          idToDateString.remove(idToDateString.firstKey());
-        }
-        Set<String> datePartSet = idToDateString.get(serialPart);
-        datePartSet.add(timestampPart);
-      }
-    }
+    serialNumberIndex.add(serialPart, timestampPart);
   }
 
   private void addDirectoryToJobListCache(Path path) throws IOException {
@@ -332,10 +529,10 @@ public class HistoryFileManager extends 
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      addToJobListCache(metaInfo);
+      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+          .getPath().getParent(), confFileName), new Path(fs.getPath()
+          .getParent(), summaryFileName), jobIndexInfo, true);
+      jobListCache.addIfAbsent(fileInfo);
     }
   }
 
@@ -371,25 +568,18 @@ public class HistoryFileManager extends 
     return fsList;
   }
 
-  private void addToJobListCache(MetaInfo metaInfo) {
-    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding " + jobId + " to job list cache with "
-          + metaInfo.getJobIndexInfo());
-    }
-    jobListCache.put(jobId, metaInfo);
-    if (jobListCache.size() > jobListCacheSize) {
-      jobListCache.remove(jobListCache.firstKey());
-    }
-  }
-
   /**
    * Scans the intermediate directory to find user directories. Scans these for
-   * history files if the modification time for the directory has changed.
+   * history files if the modification time for the directory has changed. Once
+   * it finds history files it starts the process of moving them to the done 
+   * directory.
    * 
    * @throws IOException
+   *           if there was a error while scanning
    */
-  private void scanIntermediateDirectory() throws IOException {
+  void scanIntermediateDirectory() throws IOException {
+    // TODO it would be great to limit how often this happens, except in the
+    // case where we are looking for a particular job.
     List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
         intermediateDoneDirFc, intermediateDoneDirPath, "");
 
@@ -405,7 +595,12 @@ public class HistoryFileManager extends 
         }
       }
       if (shouldScan) {
-        scanIntermediateDirectory(userDir.getPath());
+        try {
+          scanIntermediateDirectory(userDir.getPath());
+        } catch (IOException e) {
+          LOG.error("Error while trying to scan the directory " 
+              + userDir.getPath(), e);
+        }
       }
     }
   }
@@ -426,11 +621,33 @@ public class HistoryFileManager extends 
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), new Path(fs.getPath().getParent(),
-          summaryFileName), jobIndexInfo);
-      if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
-        intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+          .getPath().getParent(), confFileName), new Path(fs.getPath()
+          .getParent(), summaryFileName), jobIndexInfo, false);
+
+      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
+      if (old == null || old.didMoveFail()) {
+        final HistoryFileInfo found = (old == null) ? fileInfo : old;
+        long cutoff = System.currentTimeMillis() - maxHistoryAge;
+        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
+          try {
+            found.delete();
+          } catch (IOException e) {
+            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
+          }
+        } else {
+          moveToDoneExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                found.moveToDone();
+              } catch (IOException e) {
+                LOG.info("Failed to process fileInfo for job: " + 
+                    found.getJobId(), e);
+              }
+            }
+          });
+        }
       }
     }
   }
@@ -442,11 +659,11 @@ public class HistoryFileManager extends 
    *          fileStatus list of Job History Files.
    * @param jobId
    *          The JobId to find.
-   * @return A MetaInfo object for the jobId, null if not found.
+   * @return A FileInfo object for the jobId, null if not found.
    * @throws IOException
    */
-  private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
-      throws IOException {
+  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
+      JobId jobId) throws IOException {
     for (FileStatus fs : fileStatusList) {
       JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
           .getName());
@@ -455,10 +672,10 @@ public class HistoryFileManager extends 
             .getIntermediateConfFileName(jobIndexInfo.getJobId());
         String summaryFileName = JobHistoryUtils
             .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-        MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-            .getParent(), confFileName), new Path(fs.getPath().getParent(),
-            summaryFileName), jobIndexInfo);
-        return metaInfo;
+        HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
+            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
+            .getParent(), summaryFileName), jobIndexInfo, true);
+        return fileInfo;
       }
     }
     return null;
@@ -474,175 +691,51 @@ public class HistoryFileManager extends 
    * @return
    * @throws IOException
    */
-  private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
+  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
     int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
     String boxedSerialNumber = String.valueOf(jobSerialNumber);
-    Set<String> dateStringSet;
-    synchronized (idToDateString) {
-      Set<String> found = idToDateString.get(boxedSerialNumber);
-      if (found == null) {
-        return null;
-      } else {
-        dateStringSet = new HashSet<String>(found);
-      }
+    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
+    if (dateStringSet == null) {
+      return null;
     }
     for (String timestampPart : dateStringSet) {
       Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
       List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
           doneDirFc);
-      MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
-      if (metaInfo != null) {
-        return metaInfo;
+      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
+      if (fileInfo != null) {
+        return fileInfo;
       }
     }
     return null;
   }
 
-  /**
-   * Checks for the existence of the job history file in the intermediate
-   * directory.
-   * 
-   * @param jobId
-   * @return
-   * @throws IOException
-   */
-  private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
-    scanIntermediateDirectory();
-    return intermediateListCache.get(jobId);
-  }
-
-  /**
-   * Parse a job from the JobHistoryFile, if the underlying file is not going to
-   * be deleted.
-   * 
-   * @param metaInfo
-   *          the where the JobHistory is stored.
-   * @return the Job or null if the underlying file was deleted.
-   * @throws IOException
-   *           if there is an error trying to read the file.
-   */
-  public Job loadJob(MetaInfo metaInfo) throws IOException {
-    return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
-        metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
-        metaInfo.getConfFile(), aclsMgr);
-  }
-
-  public Collection<MetaInfo> getAllMetaInfo() throws IOException {
+  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
     scanIntermediateDirectory();
-    ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
-    result.addAll(intermediateListCache.values());
-    result.addAll(jobListCache.values());
-    return result;
+    return jobListCache.values();
   }
 
-  Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
-    scanIntermediateDirectory();
-    return intermediateListCache.values();
-  }
-
-  public MetaInfo getMetaInfo(JobId jobId) throws IOException {
-    // MetaInfo available in cache.
-    MetaInfo metaInfo = null;
-    if (jobListCache.containsKey(jobId)) {
-      metaInfo = jobListCache.get(jobId);
-    }
-
-    if (metaInfo != null) {
-      return metaInfo;
+  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
+    // FileInfo available in cache.
+    HistoryFileInfo fileInfo = jobListCache.get(jobId);
+    if (fileInfo != null) {
+      return fileInfo;
     }
-
-    // MetaInfo not available. Check intermediate directory for meta info.
-    metaInfo = scanIntermediateForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
+    // OK so scan the intermediate to be sure we did not lose it that way
+    scanIntermediateDirectory();
+    fileInfo = jobListCache.get(jobId);
+    if (fileInfo != null) {
+      return fileInfo;
     }
 
     // Intermediate directory does not contain job. Search through older ones.
-    metaInfo = scanOldDirsForJob(jobId);
-    if (metaInfo != null) {
-      return metaInfo;
+    fileInfo = scanOldDirsForJob(jobId);
+    if (fileInfo != null) {
+      return fileInfo;
     }
     return null;
   }
 
-  void moveToDone(MetaInfo metaInfo) throws IOException {
-    long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
-    if (completeTime == 0)
-      completeTime = System.currentTimeMillis();
-    JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-
-    List<Path> paths = new ArrayList<Path>();
-    Path historyFile = metaInfo.getHistoryFile();
-    if (historyFile == null) {
-      LOG.info("No file for job-history with " + jobId + " found in cache!");
-    } else {
-      paths.add(historyFile);
-    }
-
-    Path confFile = metaInfo.getConfFile();
-    if (confFile == null) {
-      LOG.info("No file for jobConf with " + jobId + " found in cache!");
-    } else {
-      paths.add(confFile);
-    }
-
-    // TODO Check all mi getters and setters for the conf path
-    Path summaryFile = metaInfo.getSummaryFile();
-    if (summaryFile == null) {
-      LOG.info("No summary file for job: " + jobId);
-    } else {
-      try {
-        String jobSummaryString = getJobSummary(intermediateDoneDirFc,
-            summaryFile);
-        SUMMARY_LOG.info(jobSummaryString);
-        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
-        intermediateDoneDirFc.delete(summaryFile, false);
-        metaInfo.setSummaryFile(null);
-      } catch (IOException e) {
-        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
-        throw e;
-      }
-    }
-
-    Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
-    addDirectoryToSerialNumberIndex(targetDir);
-    try {
-      makeDoneSubdir(targetDir);
-    } catch (IOException e) {
-      LOG.warn("Failed creating subdirectory: " + targetDir
-          + " while attempting to move files for jobId: " + jobId);
-      throw e;
-    }
-    synchronized (metaInfo) {
-      if (historyFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
-            .getName()));
-        try {
-          moveToDoneNow(historyFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
-              + jobId);
-          throw e;
-        }
-        metaInfo.setHistoryFile(toPath);
-      }
-      if (confFile != null) {
-        Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
-            .getName()));
-        try {
-          moveToDoneNow(confFile, toPath);
-        } catch (IOException e) {
-          LOG.warn("Failed to move file: " + historyFile + " for jobId: "
-              + jobId);
-          throw e;
-        }
-        metaInfo.setConfFile(toPath);
-      }
-    }
-    addToJobListCache(metaInfo);
-    intermediateListCache.remove(jobId);
-  }
-
   private void moveToDoneNow(final Path src, final Path target)
       throws IOException {
     LOG.info("Moving " + src.toString() + " to " + target.toString());
@@ -658,20 +751,9 @@ public class HistoryFileManager extends 
   }
 
   private void makeDoneSubdir(Path path) throws IOException {
-    boolean existsInExistingCache = false;
-    synchronized (existingDoneSubdirs) {
-      if (existingDoneSubdirs.contains(path))
-        existsInExistingCache = true;
-    }
     try {
       doneDirFc.getFileStatus(path);
-      if (!existsInExistingCache) {
-        existingDoneSubdirs.add(path);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
-              + " already existed, but it didn't.");
-        }
-      }
+      existingDoneSubdirs.add(path);
     } catch (FileNotFoundException fnfE) {
       try {
         FsPermission fsp = new FsPermission(
@@ -685,11 +767,8 @@ public class HistoryFileManager extends 
               + ", " + fsp);
           doneDirFc.setPermission(path, fsp);
         }
-        synchronized (existingDoneSubdirs) {
-          existingDoneSubdirs.add(path);
-        }
-      } catch (FileAlreadyExistsException faeE) { 
-        // Nothing to do.
+        existingDoneSubdirs.add(path);
+      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
       }
     }
   }
@@ -713,16 +792,22 @@ public class HistoryFileManager extends 
     return finishTime;
   }
 
-  private void deleteJobFromDone(MetaInfo metaInfo) throws IOException {
-    jobListCache.remove(metaInfo.getJobId());
-    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false);
-    doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
+  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
+    jobListCache.delete(fileInfo);
+    fileInfo.delete();
   }
 
+  /**
+   * Clean up older history files.
+   * 
+   * @throws IOException
+   *           on any error trying to remove the entries.
+   */
   @SuppressWarnings("unchecked")
-  void clean(long cutoff, HistoryStorage storage) throws IOException {
+  void clean() throws IOException {
     // TODO this should be replaced by something that knows about the directory
     // structure and will put less of a load on HDFS.
+    long cutoff = System.currentTimeMillis() - maxHistoryAge;
     boolean halted = false;
     // TODO Delete YYYY/MM/DD directories.
     List<FileStatus> serialDirList = findTimestampedDirectories();
@@ -737,13 +822,17 @@ public class HistoryFileManager extends 
         long effectiveTimestamp = getEffectiveTimestamp(
             jobIndexInfo.getFinishTime(), historyFile);
         if (effectiveTimestamp <= cutoff) {
-          String confFileName = JobHistoryUtils
-              .getIntermediateConfFileName(jobIndexInfo.getJobId());
-          MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(
-              historyFile.getPath().getParent(), confFileName), null,
-              jobIndexInfo);
-          storage.jobRemovedFromHDFS(metaInfo.getJobId());
-          deleteJobFromDone(metaInfo);
+          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
+              .getJobId());
+          if (fileInfo == null) {
+            String confFileName = JobHistoryUtils
+                .getIntermediateConfFileName(jobIndexInfo.getJobId());
+
+            fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
+                historyFile.getPath().getParent(), confFileName), null,
+                jobIndexInfo, true);
+          }
+          deleteJobFromDone(fileInfo);
         } else {
           halted = true;
           break;
@@ -752,9 +841,7 @@ public class HistoryFileManager extends 
       if (!halted) {
         doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
         removeDirectoryFromSerialNumberIndex(serialDir.getPath());
-        synchronized (existingDoneSubdirs) {
-          existingDoneSubdirs.remove(serialDir.getPath());
-        }
+        existingDoneSubdirs.remove(serialDir.getPath());
       } else {
         break; // Don't scan any more directories.
       }

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java Tue Apr 24 19:05:09 2012
@@ -28,7 +28,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * Provides an API to query jobs that have finished. 
+ * Provides an API to query jobs that have finished.
+ * 
+ * For those implementing this API be aware that there is no feedback when
+ * files are removed from HDFS.  You may rely on HistoryFileManager to help
+ * you know when that has happened if you have not made a complete backup of
+ * the data stored on HDFS.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
@@ -71,10 +76,4 @@ public interface HistoryStorage {
    * @return the job, or null if it is not found.
    */
   Job getFullJob(JobId jobId);
-
-  /**
-   * Informs the Storage that a job has been removed from HDFS
-   * @param jobId the ID of the job that was removed.
-   */
-  void jobRemovedFromHDFS(JobId jobId);
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue Apr 24 19:05:09 2012
@@ -21,10 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
@@ -37,7 +34,7 @@ import org.apache.hadoop.mapreduce.TypeC
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -66,15 +63,9 @@ public class JobHistory extends Abstract
   // Time interval for the move thread.
   private long moveThreadInterval;
 
-  // Number of move threads.
-  private int numMoveThreads;
-
   private Configuration conf;
 
-  private Thread moveIntermediateToDoneThread = null;
-  private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
-
-  private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
+  private ScheduledThreadPoolExecutor scheduledExecutor = null;
 
   private HistoryStorage storage = null;
   private HistoryFileManager hsManager = null;
@@ -91,8 +82,6 @@ public class JobHistory extends Abstract
     moveThreadInterval = conf.getLong(
         JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
         JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
-    numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
-        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
 
     hsManager = new HistoryFileManager();
     hsManager.init(conf);
@@ -120,27 +109,22 @@ public class JobHistory extends Abstract
       ((Service) storage).start();
     }
 
-    // Start moveIntermediatToDoneThread
-    moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
-        moveThreadInterval, numMoveThreads);
-    moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
-    moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
-    moveIntermediateToDoneThread.start();
+    scheduledExecutor = new ScheduledThreadPoolExecutor(2,
+        new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
+            .build());
+
+    scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
+        moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
 
     // Start historyCleaner
     boolean startCleanerService = conf.getBoolean(
         JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
     if (startCleanerService) {
-      long maxAgeOfHistoryFiles = conf.getLong(
-          JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
-          JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
-      cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
-          new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
       long runInterval = conf.getLong(
           JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
           JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
-      cleanerScheduledExecutor
-          .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
+      scheduledExecutor
+          .scheduleAtFixedRate(new HistoryCleaner(),
               30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
     }
     super.start();
@@ -149,24 +133,12 @@ public class JobHistory extends Abstract
   @Override
   public void stop() {
     LOG.info("Stopping JobHistory");
-    if (moveIntermediateToDoneThread != null) {
-      LOG.info("Stopping move thread");
-      moveIntermediateToDoneRunnable.stop();
-      moveIntermediateToDoneThread.interrupt();
-      try {
-        LOG.info("Joining on move thread");
-        moveIntermediateToDoneThread.join();
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted while stopping move thread");
-      }
-    }
-
-    if (cleanerScheduledExecutor != null) {
-      LOG.info("Stopping History Cleaner");
-      cleanerScheduledExecutor.shutdown();
+    if (scheduledExecutor != null) {
+      LOG.info("Stopping History Cleaner/Move To Done");
+      scheduledExecutor.shutdown();
       boolean interrupted = false;
       long currentTime = System.currentTimeMillis();
-      while (!cleanerScheduledExecutor.isShutdown()
+      while (!scheduledExecutor.isShutdown()
           && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
         try {
           Thread.sleep(20);
@@ -174,8 +146,10 @@ public class JobHistory extends Abstract
           interrupted = true;
         }
       }
-      if (!cleanerScheduledExecutor.isShutdown()) {
-        LOG.warn("HistoryCleanerService shutdown may not have succeeded");
+      if (!scheduledExecutor.isShutdown()) {
+        LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
+        		"succeeded, Forcing a shutdown");
+        scheduledExecutor.shutdownNow();
       }
     }
     if (storage instanceof Service) {
@@ -195,68 +169,34 @@ public class JobHistory extends Abstract
   }
 
   private class MoveIntermediateToDoneRunnable implements Runnable {
-
-    private long sleepTime;
-    private ThreadPoolExecutor moveToDoneExecutor = null;
-    private boolean running = false;
-
-    public synchronized void stop() {
-      running = false;
-      notify();
-    }
-
-    MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
-      this.sleepTime = sleepTime;
-      ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
-          "MoveIntermediateToDone Thread #%d").build();
-      moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
-          TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
-      running = true;
-    }
-
     @Override
     public void run() {
-      Thread.currentThread().setName("IntermediateHistoryScanner");
       try {
-        while (true) {
-          LOG.info("Starting scan to move intermediate done files");
-          for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
-            moveToDoneExecutor.execute(new Runnable() {
-              @Override
-              public void run() {
-                try {
-                  hsManager.moveToDone(metaInfo);
-                } catch (IOException e) {
-                  LOG.info(
-                      "Failed to process metaInfo for job: "
-                          + metaInfo.getJobId(), e);
-                }
-              }
-            });
-          }
-          synchronized (this) {
-            try {
-              this.wait(sleepTime);
-            } catch (InterruptedException e) {
-              LOG.info("IntermediateHistoryScannerThread interrupted");
-            }
-            if (!running) {
-              break;
-            }
-          }
-        }
+        LOG.info("Starting scan to move intermediate done files");
+        hsManager.scanIntermediateDirectory();
       } catch (IOException e) {
-        LOG.warn("Unable to get a list of intermediate files to be moved");
-        // TODO Shut down the entire process!!!!
+        LOG.error("Error while scanning intermediate done dir ", e);
       }
     }
   }
+  
+  private class HistoryCleaner implements Runnable {
+    public void run() {
+      LOG.info("History Cleaner started");
+      try {
+        hsManager.clean();
+      } catch (IOException e) {
+        LOG.warn("Error trying to clean up ", e);
+      }
+      LOG.info("History Cleaner complete");
+    }
+  }
 
   /**
    * Helper method for test cases.
    */
-  MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
-    return hsManager.getMetaInfo(jobId);
+  HistoryFileInfo getJobFileInfo(JobId jobId) throws IOException {
+    return hsManager.getFileInfo(jobId);
   }
 
   @Override
@@ -313,25 +253,6 @@ public class JobHistory extends Abstract
         fBegin, fEnd, jobState);
   }
 
-  public class HistoryCleaner implements Runnable {
-    long maxAgeMillis;
-
-    public HistoryCleaner(long maxAge) {
-      this.maxAgeMillis = maxAge;
-    }
-
-    public void run() {
-      LOG.info("History Cleaner started");
-      long cutoff = System.currentTimeMillis() - maxAgeMillis;
-      try {
-        hsManager.clean(cutoff, storage);
-      } catch (IOException e) {
-        LOG.warn("Error trying to clean up ", e);
-      }
-      LOG.info("History Cleaner complete");
-    }
-  }
-
   // TODO AppContext - Not Required
   private ApplicationAttemptId appAttemptID;
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Tue Apr 24 19:05:09 2012
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -166,6 +167,11 @@ public class PartialJob implements org.a
   public Path getConfFile() {
     throw new IllegalStateException("Not implemented yet");
   }
+  
+  @Override
+  public Configuration loadConfFile() {
+    throw new IllegalStateException("Not implemented yet");
+  }
 
   @Override
   public Map<JobACL, AccessControlList> getJobACLs() {

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java Tue Apr 24 19:05:09 2012
@@ -89,7 +89,8 @@ public class HsTaskPage extends HsView {
       headRow.
             th(".id", "Attempt").
             th(".state", "State").
-            th(".node", "node").
+            th(".node", "Node").
+            th(".logs", "Logs").
             th(".tsh", "Start Time");
       
       if(type == TaskType.REDUCE) {
@@ -144,10 +145,11 @@ public class HsTaskPage extends HsView {
             _(taid)._().td(ta.getState().toString()).td().a(".nodelink",
                 "http://"+ nodeHttpAddr,
                 nodeRackName + "/" + nodeHttpAddr);
-        td._(" ").a(".logslink",
-            url("logs", nodeIdString, containerIdString, taid, app.getJob()
-                .getUserName()), "logs");
         td._();
+        row.td().
+          a(".logslink",
+            url("logs", nodeIdString, containerIdString, taid, app.getJob()
+                .getUserName()), "logs")._();
         
         row.td().
           br().$title(String.valueOf(attemptStartTime))._().
@@ -196,6 +198,8 @@ public class HsTaskPage extends HsView {
           th().input("search_init").$type(InputType.text).
               $name("attempt_node").$value("Node")._()._().
           th().input("search_init").$type(InputType.text).
+              $name("attempt_node").$value("Logs")._()._().
+          th().input("search_init").$type(InputType.text).
               $name("attempt_start_time").$value("Start Time")._()._();
       
       if(type == TaskType.REDUCE) {

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java Tue Apr 24 19:05:09 2012
@@ -65,7 +65,6 @@ import com.google.inject.Inject;
 public class HsWebServices {
   private final HistoryContext ctx;
   private WebApp webapp;
-  private final Configuration conf;
 
   @Context
   UriInfo uriInfo;
@@ -74,7 +73,6 @@ public class HsWebServices {
   public HsWebServices(final HistoryContext ctx, final Configuration conf,
       final WebApp webapp) {
     this.ctx = ctx;
-    this.conf = conf;
     this.webapp = webapp;
   }
 
@@ -222,7 +220,7 @@ public class HsWebServices {
     Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
     ConfInfo info;
     try {
-      info = new ConfInfo(job, this.conf);
+      info = new ConfInfo(job);
     } catch (IOException e) {
       throw new NotFoundException("unable to load configuration for job: "
           + jid);

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Tue Apr 24 19:05:09 2012
@@ -22,12 +22,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import static org.mockito.Mockito.*;
+
 @RunWith(value = Parameterized.class)
 public class TestJobHistoryEntities {
 
@@ -61,10 +64,12 @@ public class TestJobHistoryEntities {
   /* Verify some expected values based on the history file */
   @Test
   public void testCompletedJob() throws Exception {
+    HistoryFileInfo info = mock(HistoryFileInfo.class);
+    when(info.getConfFile()).thenReturn(fullConfPath);
     //Re-initialize to verify the delayed load.
     completedJob =
       new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
-          fullConfPath, jobAclsManager);
+          info, jobAclsManager);
     //Verify tasks loaded based on loadTask parameter.
     assertEquals(loadTasks, completedJob.tasksLoaded.get());
     assertEquals(1, completedJob.getAMInfos().size());
@@ -84,9 +89,11 @@ public class TestJobHistoryEntities {
   
   @Test
   public void testCompletedTask() throws Exception {
+    HistoryFileInfo info = mock(HistoryFileInfo.class);
+    when(info.getConfFile()).thenReturn(fullConfPath);
     completedJob =
       new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
-          fullConfPath, jobAclsManager);
+          info, jobAclsManager);
     TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
     TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
     
@@ -111,9 +118,11 @@ public class TestJobHistoryEntities {
   
   @Test
   public void testCompletedTaskAttempt() throws Exception {
+    HistoryFileInfo info = mock(HistoryFileInfo.class);
+    when(info.getConfFile()).thenReturn(fullConfPath);
     completedJob =
       new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
-          fullConfPath, jobAclsManager);
+          info, jobAclsManager);
     TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
     TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
     TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue Apr 24 19:05:09 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -84,12 +85,22 @@ public class TestJobHistoryParsing {
 
   @Test
   public void testHistoryParsing() throws Exception {
-    checkHistoryParsing(2, 1, 2);
+    LOG.info("STARTING testHistoryParsing()");
+    try {
+      checkHistoryParsing(2, 1, 2);
+    } finally {
+      LOG.info("FINISHED testHistoryParsing()");
+    }
   }
   
   @Test
   public void testHistoryParsingWithParseErrors() throws Exception {
-    checkHistoryParsing(3, 0, 2);
+    LOG.info("STARTING testHistoryParsingWithParseErrors()");
+    try {
+      checkHistoryParsing(3, 0, 2);
+    } finally {
+      LOG.info("FINISHED testHistoryParsingWithParseErrors()");
+    }
   }
   
   private static String getJobSummary(FileContext fc, Path path) throws IOException {
@@ -124,61 +135,112 @@ public class TestJobHistoryParsing {
 
     String jobhistoryDir = JobHistoryUtils
         .getHistoryIntermediateDoneDirForUser(conf);
-    JobHistory jobHistory = new JobHistory();
-    jobHistory.init(conf);
-
-    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
-        .getJobIndexInfo();
-    String jobhistoryFileName = FileNameIndexUtils
-        .getDoneFileName(jobIndexInfo);
-
-    Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
-    FSDataInputStream in = null;
-    LOG.info("JobHistoryFile is: " + historyFilePath);
+    
     FileContext fc = null;
     try {
       fc = FileContext.getFileContext(conf);
-      in = fc.open(fc.makeQualified(historyFilePath));
     } catch (IOException ioe) {
-      LOG.info("Can not open history file: " + historyFilePath, ioe);
-      throw (new Exception("Can not open History File"));
+      LOG.info("Can not get FileContext", ioe);
+      throw (new Exception("Can not get File Context"));
     }
-
-    JobHistoryParser parser = new JobHistoryParser(in);
-    final EventReader realReader = new EventReader(in);
-    EventReader reader = Mockito.mock(EventReader.class);
+    
     if (numMaps == numSuccessfulMaps) {
-      reader = realReader;
-    } else {
-      final AtomicInteger numFinishedEvents = new AtomicInteger(0);  // Hack!
-      Mockito.when(reader.getNextEvent()).thenAnswer(
-          new Answer<HistoryEvent>() {
-            public HistoryEvent answer(InvocationOnMock invocation) 
-                throws IOException {
-              HistoryEvent event = realReader.getNextEvent();
-              if (event instanceof TaskFinishedEvent) {
-                numFinishedEvents.incrementAndGet();
-              }
-              
-              if (numFinishedEvents.get() <= numSuccessfulMaps) {
-                return event;
-              } else {
-                throw new IOException("test");
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobId);
+      Path summaryFile = new Path(jobhistoryDir, summaryFileName);
+      String jobSummaryString = getJobSummary(fc, summaryFile);
+      Assert.assertNotNull(jobSummaryString);
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+      Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
+
+      Map<String, String> jobSummaryElements = new HashMap<String, String>();
+      StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
+      while (strToken.hasMoreTokens()) {
+        String keypair = strToken.nextToken();
+        jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
+      }
+
+      Assert.assertEquals("JobId does not match", jobId.toString(),
+          jobSummaryElements.get("jobId"));
+      Assert.assertEquals("JobName does not match", "test",
+          jobSummaryElements.get("jobName"));
+      Assert.assertTrue("submitTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
+      Assert.assertTrue("launchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
+      Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+      Assert
+      .assertTrue(
+          "firstReduceTaskLaunchTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+      Assert.assertTrue("finishTime should not be 0",
+          Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
+      Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
+          Integer.parseInt(jobSummaryElements.get("numMaps")));
+      Assert.assertEquals("Mismatch in num reduce slots", numReduces,
+          Integer.parseInt(jobSummaryElements.get("numReduces")));
+      Assert.assertEquals("User does not match", System.getProperty("user.name"),
+          jobSummaryElements.get("user"));
+      Assert.assertEquals("Queue does not match", "default",
+          jobSummaryElements.get("queue"));
+      Assert.assertEquals("Status does not match", "SUCCEEDED",
+          jobSummaryElements.get("status"));
+    }
+
+    JobHistory jobHistory = new JobHistory();
+    jobHistory.init(conf);
+    HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+    JobInfo jobInfo;
+    long numFinishedMaps;
+    
+    synchronized(fileInfo) {
+      Path historyFilePath = fileInfo.getHistoryFile();
+      FSDataInputStream in = null;
+      LOG.info("JobHistoryFile is: " + historyFilePath);
+      try {
+        in = fc.open(fc.makeQualified(historyFilePath));
+      } catch (IOException ioe) {
+        LOG.info("Can not open history file: " + historyFilePath, ioe);
+        throw (new Exception("Can not open History File"));
+      }
+
+      JobHistoryParser parser = new JobHistoryParser(in);
+      final EventReader realReader = new EventReader(in);
+      EventReader reader = Mockito.mock(EventReader.class);
+      if (numMaps == numSuccessfulMaps) {
+        reader = realReader;
+      } else {
+        final AtomicInteger numFinishedEvents = new AtomicInteger(0);  // Hack!
+        Mockito.when(reader.getNextEvent()).thenAnswer(
+            new Answer<HistoryEvent>() {
+              public HistoryEvent answer(InvocationOnMock invocation) 
+              throws IOException {
+                HistoryEvent event = realReader.getNextEvent();
+                if (event instanceof TaskFinishedEvent) {
+                  numFinishedEvents.incrementAndGet();
+                }
+
+                if (numFinishedEvents.get() <= numSuccessfulMaps) {
+                  return event;
+                } else {
+                  throw new IOException("test");
+                }
               }
             }
-          }
         );
-    }
-    
-    JobInfo jobInfo = parser.parse(reader);
-    
-    long numFinishedMaps = 
+      }
+
+      jobInfo = parser.parse(reader);
+
+      numFinishedMaps = 
         computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
-    
-    if (numFinishedMaps != numMaps) {
-      Exception parseException = parser.getParseException();
-      Assert.assertNotNull("Didn't get expected parse exception", 
-          parseException);
+
+      if (numFinishedMaps != numMaps) {
+        Exception parseException = parser.getParseException();
+        Assert.assertNotNull("Didn't get expected parse exception", 
+            parseException);
+      }
     }
     
     Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
@@ -246,52 +308,6 @@ public class TestJobHistoryParsing {
         }
       }
     }
-
-    if (numMaps == numSuccessfulMaps) {
-
-      String summaryFileName = JobHistoryUtils
-          .getIntermediateSummaryFileName(jobId);
-      Path summaryFile = new Path(jobhistoryDir, summaryFileName);
-      String jobSummaryString = getJobSummary(fc, summaryFile);
-      Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
-      Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
-      Assert.assertNotNull(jobSummaryString);
-
-      Map<String, String> jobSummaryElements = new HashMap<String, String>();
-      StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
-      while (strToken.hasMoreTokens()) {
-        String keypair = strToken.nextToken();
-        jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
-
-      }
-
-      Assert.assertEquals("JobId does not match", jobId.toString(),
-          jobSummaryElements.get("jobId"));
-      Assert.assertEquals("JobName does not match", "test",
-          jobSummaryElements.get("jobName"));
-      Assert.assertTrue("submitTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
-      Assert.assertTrue("launchTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
-      Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
-      Assert
-      .assertTrue(
-          "firstReduceTaskLaunchTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
-      Assert.assertTrue("finishTime should not be 0",
-          Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
-      Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
-          Integer.parseInt(jobSummaryElements.get("numMaps")));
-      Assert.assertEquals("Mismatch in num reduce slots", numReduces,
-          Integer.parseInt(jobSummaryElements.get("numReduces")));
-      Assert.assertEquals("User does not match", System.getProperty("user.name"),
-          jobSummaryElements.get("user"));
-      Assert.assertEquals("Queue does not match", "default",
-          jobSummaryElements.get("queue"));
-      Assert.assertEquals("Status does not match", "SUCCEEDED",
-          jobSummaryElements.get("status"));
-    }
   }
   
   // Computes finished maps similar to RecoveryService...
@@ -314,6 +330,8 @@ public class TestJobHistoryParsing {
   
   @Test
   public void testHistoryParsingForFailedAttempts() throws Exception {
+    LOG.info("STARTING testHistoryParsingForFailedAttempts");
+    try {
     Configuration conf = new Configuration();
     conf
         .setClass(
@@ -335,7 +353,7 @@ public class TestJobHistoryParsing {
     JobHistory jobHistory = new JobHistory();
     jobHistory.init(conf);
 
-    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
+    JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
         .getJobIndexInfo();
     String jobhistoryFileName = FileNameIndexUtils
         .getDoneFileName(jobIndexInfo);
@@ -372,6 +390,9 @@ public class TestJobHistoryParsing {
       }
     }
     Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
+    } finally {
+      LOG.info("FINISHED testHistoryParsingForFailedAttempts");
+    }
   }
   
   static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml Tue Apr 24 19:05:09 2012
@@ -292,7 +292,7 @@
 <property><!--Loaded from job.xml--><name>yarn.resourcemanager.scheduler.address</name><value>0.0.0.0:8030</value></property>
 <property><!--Loaded from job.xml--><name>fs.trash.checkpoint.interval</name><value>0</value></property>
 <property><!--Loaded from job.xml--><name>s3native.stream-buffer-size</name><value>4096</value></property>
-<property><!--Loaded from job.xml--><name>yarn.scheduler.fifo.minimum-allocation-mb</name><value>1024</value></property>
+<property><!--Loaded from job.xml--><name>yarn.scheduler.minimum-allocation-mb</name><value>128</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.reduce.shuffle.read.timeout</name><value>180000</value></property>
 <property><!--Loaded from job.xml--><name>yarn.app.mapreduce.am.command-opts</name><value>-Xmx500m</value></property>
 <property><!--Loaded from job.xml--><name>mapreduce.admin.user.env</name><value>LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native</value></property>

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Tue Apr 24 19:05:09 2012
@@ -282,7 +282,7 @@ public class ClientServiceDelegate {
   }
 
   private synchronized Object invoke(String method, Class argClass,
-      Object args) throws YarnRemoteException {
+      Object args) throws IOException {
     Method methodOb = null;
     try {
       methodOb = MRClientProtocol.class.getMethod(method, argClass);
@@ -291,7 +291,11 @@ public class ClientServiceDelegate {
     } catch (NoSuchMethodException e) {
       throw new YarnException("Method name mismatch", e);
     }
-    while (true) {
+    int maxRetries = this.conf.getInt(
+        MRJobConfig.MR_CLIENT_MAX_RETRIES,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+    IOException lastException = null;
+    while (maxRetries > 0) {
       try {
         return methodOb.invoke(getProxy(), args);
       } catch (YarnRemoteException yre) {
@@ -308,13 +312,21 @@ public class ClientServiceDelegate {
             " retrying..", e.getTargetException());
         // Force reconnection by setting the proxy to null.
         realProxy = null;
+        // HS/AMS shut down
+        maxRetries--;
+        lastException = new IOException(e.getMessage());
+        
       } catch (Exception e) {
         LOG.debug("Failed to contact AM/History for job " + jobId
             + "  Will retry..", e);
         // Force reconnection by setting the proxy to null.
         realProxy = null;
+        // RM shutdown
+        maxRetries--;
+        lastException = new IOException(e.getMessage());     
       }
     }
+    throw lastException;
   }
 
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
@@ -364,7 +376,7 @@ public class ClientServiceDelegate {
     return result;
   }
   
-  public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+  public JobStatus getJobStatus(JobID oldJobID) throws IOException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
       TypeConverter.toYarn(oldJobID);
     GetJobReportRequest request =
@@ -390,7 +402,7 @@ public class ClientServiceDelegate {
   }
 
   public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
-       throws YarnRemoteException, YarnRemoteException {
+       throws IOException{
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
       TypeConverter.toYarn(oldJobID);
     GetTaskReportsRequest request =
@@ -407,7 +419,7 @@ public class ClientServiceDelegate {
   }
 
   public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
-       throws YarnRemoteException {
+       throws IOException {
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
       = TypeConverter.toYarn(taskAttemptID);
     if (fail) {
@@ -423,7 +435,7 @@ public class ClientServiceDelegate {
   }
 
   public boolean killJob(JobID oldJobID)
-       throws YarnRemoteException {
+       throws IOException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
     = TypeConverter.toYarn(oldJobID);
     KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Tue Apr 24 19:05:09 2012
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
@@ -88,12 +87,10 @@ public class ResourceMgrDelegate {
   public ResourceMgrDelegate(YarnConfiguration conf) {
     this.conf = conf;
     YarnRPC rpc = YarnRPC.create(this.conf);
-    InetSocketAddress rmAddress =
-        NetUtils.createSocketAddr(this.conf.get(
+    InetSocketAddress rmAddress = conf.getSocketAddr(
             YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS),
-            YarnConfiguration.DEFAULT_RM_PORT,
-            YarnConfiguration.RM_ADDRESS);
+            YarnConfiguration.DEFAULT_RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_PORT);
     this.rmAddress = rmAddress.toString();
     LOG.debug("Connecting to ResourceManager at " + rmAddress);
     applicationsManager =

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/DistributedFSCheck.java Tue Apr 24 19:05:09 2012
@@ -128,7 +128,7 @@ public class DistributedFSCheck extends 
     if (rootStatus.isFile()) {
       nrFiles++;
       // For a regular file generate <fName,offset> pairs
-      long blockSize = fs.getDefaultBlockSize();
+      long blockSize = fs.getDefaultBlockSize(rootFile);
       long fileLength = rootStatus.getLen();
       for(long offset = 0; offset < fileLength; offset += blockSize)
         writer.append(new Text(rootFile.toString()), new LongWritable(offset));
@@ -160,15 +160,16 @@ public class DistributedFSCheck extends 
                        ) throws IOException {
       // open file
       FSDataInputStream in = null;
+      Path p = new Path(name);
       try {
-        in = fs.open(new Path(name));
+        in = fs.open(p);
       } catch(IOException e) {
         return name + "@(missing)";
       }
       in.seek(offset);
       long actualSize = 0;
       try {
-        long blockSize = fs.getDefaultBlockSize();
+        long blockSize = fs.getDefaultBlockSize(p);
         reporter.setStatus("reading " + name + "@" + 
                            offset + "/" + blockSize);
         for( int curSize = bufferSize; 

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java Tue Apr 24 19:05:09 2012
@@ -83,7 +83,9 @@ public class TestSocketFactory {
       JobConf jobConf = new JobConf();
       FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
       miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
-      JobConf jconf = new JobConf(cconf);
+      JobConf jconf = new JobConf(miniMRYarnCluster.getConfig());
+      jconf.set("hadoop.rpc.socket.factory.class.default",
+                "org.apache.hadoop.ipc.DummySocketFactory");
       jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
       String rmAddress = jconf.get("yarn.resourcemanager.address");
       String[] split = rmAddress.split(":");

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Tue Apr 24 19:05:09 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -122,8 +123,7 @@ public class TestClientServiceDelegate {
 
     MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
     when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
-        new RuntimeException("1")).thenThrow(new RuntimeException("2"))
-        .thenThrow(new RuntimeException("3"))
+        new RuntimeException("1")).thenThrow(new RuntimeException("2"))       
         .thenReturn(getJobReportResponse());
 
     ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
@@ -135,7 +135,7 @@ public class TestClientServiceDelegate {
 
     JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
     Assert.assertNotNull(jobStatus);
-    verify(historyServerProxy, times(4)).getJobReport(
+    verify(historyServerProxy, times(3)).getJobReport(
         any(GetJobReportRequest.class));
   }
 
@@ -312,6 +312,74 @@ public class TestClientServiceDelegate {
         any(String.class));
   }
   
+  @Test
+  public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
+    Configuration conf = new YarnConfiguration();
+    testRMDownForJobStatusBeforeGetAMReport(conf,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+  }
+
+  @Test
+  public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
+      throws IOException {
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
+    testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
+        MRJobConfig.MR_CLIENT_MAX_RETRIES,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES));
+  }
+  
+  @Test
+  public void testRMDownRestoreForJobStatusBeforeGetAMReport()
+      throws IOException {
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
+
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+        !isAMReachableFromClient);
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
+        .thenReturn(getJobReportResponse());
+    ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+    when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced1"))).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced2"))).thenReturn(getFinishedApplicationReport());
+    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+        conf, rmDelegate, oldJobId, historyServerProxy);
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
+    Assert.assertNotNull(jobStatus);
+  }
+
+  private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
+      int noOfRetries) throws YarnRemoteException {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
+        !isAMReachableFromClient);
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+    when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced1"))).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced2"))).thenThrow(
+        new java.lang.reflect.UndeclaredThrowableException(new IOException(
+            "Connection refuced3")));
+    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+        conf, rmDelegate, oldJobId, historyServerProxy);
+    try {
+      clientServiceDelegate.getJobStatus(oldJobId);
+      Assert.fail("It should throw exception after retries");
+    } catch (IOException e) {
+      System.out.println("fail to get job status,and e=" + e.toString());
+    }
+    verify(rmDelegate, times(noOfRetries)).getApplicationReport(
+        any(ApplicationId.class));
+  }  
+
   private GetJobReportRequest getJobReportRequest() {
     GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
     request.setJobId(jobId);

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java Tue Apr 24 19:05:09 2012
@@ -173,7 +173,7 @@ public class TestMiniMRClasspath {
       fileSys = dfs.getFileSystem();
       namenode = fileSys.getUri().toString();
       mr = new MiniMRCluster(taskTrackers, namenode, 3);
-      JobConf jobConf = new JobConf();
+      JobConf jobConf = mr.createJobConf();
       String result;
       result = launchWordCount(fileSys.getUri(), jobConf,
           "The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1);
@@ -205,7 +205,7 @@ public class TestMiniMRClasspath {
       fileSys = dfs.getFileSystem();
       namenode = fileSys.getUri().toString();
       mr = new MiniMRCluster(taskTrackers, namenode, 3);      
-      JobConf jobConf = new JobConf();
+      JobConf jobConf = mr.createJobConf();
       String result;
       
       result = launchExternal(fileSys.getUri(), jobConf,

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1329947&r1=1329946&r2=1329947&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Tue Apr 24 19:05:09 2012
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -126,6 +128,10 @@ public class MiniMRYarnCluster extends M
     @Override
     public synchronized void start() {
       try {
+        getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+                        MiniYARNCluster.getHostname() + ":0");
+        getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+                        MiniYARNCluster.getHostname() + ":0");
         historyServer = new JobHistoryServer();
         historyServer.init(getConfig());
         new Thread() {
@@ -145,6 +151,20 @@ public class MiniMRYarnCluster extends M
       } catch (Throwable t) {
         throw new YarnException(t);
       }
+      //need to do this because historyServer.init creates a new Configuration
+      getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+                      historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+      getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+                      historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
+
+      LOG.info("MiniMRYARN ResourceManager address: " +
+               getConfig().get(YarnConfiguration.RM_ADDRESS));
+      LOG.info("MiniMRYARN ResourceManager web address: " +
+               getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+      LOG.info("MiniMRYARN HistoryServer address: " +
+               getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+      LOG.info("MiniMRYARN HistoryServer web address: " +
+               getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
     }
 
     @Override