You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/02 08:03:59 UTC

svn commit: r1098495 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/...

Author: sharad
Date: Mon May  2 06:03:58 2011
New Revision: 1098495

URL: http://svn.apache.org/viewvc?rev=1098495&view=rev
Log:
MAPREDUCE-2462. Write job conf along with JobHistory, other minor improvements. Contributed by Siddharth Seth.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May  2 06:03:58 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    MAPREDUCE-2462. Write job conf along with JobHistory, other minor improvements. 
+    (Siddharth Seth via sharad)
 
     Fix to send finish application event only when the application is finished (mahadev)
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Mon May  2 06:03:58 2011
@@ -32,16 +32,17 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -75,36 +76,35 @@ public class JobHistoryEventHandler exte
   private static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
 
-  static final FsPermission HISTORY_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0750); // rwxr-x---
-  
-  public static final FsPermission HISTORY_FILE_PERMISSION =
-    FsPermission.createImmutable((short) 0740); // rwxr-----
-
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
     this.context = context;
     this.startCount = startCount;
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.yarn.service.AbstractService#init(org.apache.hadoop.conf.Configuration)
+   * Initializes the FileContext and Path objects for the log and done directories.
+   * Creates these directories if they do not already exist.
+   */
   @Override
   public void init(Configuration conf) {
-    String defaultLogDir = conf.get(
-        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/staging";
-    String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
-      defaultLogDir);
-    String defaultDoneDir = conf.get(
-        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
-    String  doneDirPrefix =
-      conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
-          defaultDoneDir);
+    
+    String logDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(conf);
+    String  doneDirPrefix = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+
     try {
       doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
           new Path(doneDirPrefix));
       doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
       if (!doneDirFc.util().exists(doneDirPrefixPath)) {
-        doneDirFc.mkdir(doneDirPrefixPath,
-          new FsPermission(HISTORY_DIR_PERMISSION), true);
+        try {
+          doneDirFc.mkdir(doneDirPrefixPath, new FsPermission(
+              JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+        } catch (FileAlreadyExistsException e) {
+          LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
+              + "] already exists.");
+        }
       }
     } catch (IOException e) {
           LOG.info("error creating done directory on dfs " + e);
@@ -115,12 +115,16 @@ public class JobHistoryEventHandler exte
           new Path(logDir));
       logDirFc = FileContext.getFileContext(logDirPath.toUri(), conf);
       if (!logDirFc.util().exists(logDirPath)) {
-        logDirFc.mkdir(logDirPath,
-          new FsPermission(HISTORY_DIR_PERMISSION), true);
+        try {
+          logDirFc.mkdir(logDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION),
+              true);
+        } catch (FileAlreadyExistsException e) {
+          LOG.info("JobHistory Log Directory: [" + doneDirPrefixPath
+              + "] already exists.");
+        }
       }
     } catch (IOException ioe) {
-      LOG.info("Mkdirs failed to create " +
-          logDirPath.toString());
+      LOG.info("Mkdirs failed to create " + logDirPath.toString());
       throw new YarnException(ioe);
     }
     super.init(conf);
@@ -182,23 +186,28 @@ public class JobHistoryEventHandler exte
 
   /**
    * Create an event writer for the Job represented by the jobID.
+   * Writes out the job configuration to the log directory.
    * This should be the first call to history for a job
-   * @param jobId
+   * 
+   * @param jobId the jobId.
    * @throws IOException
    */
   protected void setupEventWriter(JobId jobId)
   throws IOException {
     if (logDirPath == null) {
+      LOG.info("Log Directory is null, returning");
       throw new IOException("Missing Log Directory for History");
     }
 
     MetaInfo oldFi = fileMap.get(jobId);
+    Configuration conf = getConfig();
 
     long submitTime = (oldFi == null ? context.getClock().getTime() : oldFi.submitTime);
 
-    Path logFile = getJobHistoryFile(logDirPath, jobId);
     // String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
-    String user = getConfig().get(MRJobConfig.USER_NAME);
+    
+    Path logFile = JobHistoryUtils.getJobHistoryFile(logDirPath, jobId, startCount);
+    String user = conf.get(MRJobConfig.USER_NAME);
     if (user == null) {
       throw new IOException("User is null while setting up jobhistory eventwriter" );
     }
@@ -207,16 +216,36 @@ public class JobHistoryEventHandler exte
  
     if (writer == null) {
       try {
-        FSDataOutputStream out = logDirFc.create(logFile, EnumSet
-            .of(CreateFlag.CREATE, CreateFlag.OVERWRITE));
+        FSDataOutputStream out = logDirFc.create(logFile,
+            EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE));
+        //TODO Permissions for the history file?
         writer = new EventWriter(out);
       } catch (IOException ioe) {
-        LOG.info("Could not create log file for job " + jobName);
+        LOG.info("Could not create log file: [" + logFile + "] + for job " + "[" + jobName + "]");
         throw ioe;
       }
     }
-    /*TODO Storing the job conf on the log dir if required*/
-    MetaInfo fi = new MetaInfo(logFile, writer, submitTime, user, jobName);
+    
+    //This could be done at the end as well in moveToDone
+    Path logDirConfPath = null;
+    if (conf != null) {
+      logDirConfPath = getConfFile(logDirPath, jobId);
+      LOG.info("XXX: Attempting to write config to: " + logDirConfPath);
+      FSDataOutputStream jobFileOut = null;
+      try {
+        if (logDirConfPath != null) {
+          jobFileOut = logDirFc.create(logDirConfPath,
+              EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE));
+          conf.writeXml(jobFileOut);
+          jobFileOut.close();
+        }
+      } catch (IOException e) {
+        LOG.info("Failed to close the job configuration file "
+            + StringUtils.stringifyException(e));
+      }
+    }
+    
+    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName);
     fileMap.put(jobId, fi);
   }
 
@@ -228,6 +257,7 @@ public class JobHistoryEventHandler exte
       if (mi != null) {
         mi.closeWriter();
       }
+      
     } catch (IOException e) {
       LOG.info("Error closing writer for JobID: " + id);
       throw e;
@@ -260,19 +290,11 @@ public class JobHistoryEventHandler exte
       mi.writeEvent(historyEvent);
       LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType());
     } catch (IOException e) {
-      LOG.error("in handler ioException " + e);
+      LOG.error("Error writing History Event " + e);
       throw new YarnException(e);
     }
     // check for done
     if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
-      JobFinishedEvent jfe = (JobFinishedEvent) event.getHistoryEvent();
-      String statusstoredir = doneDirPrefixPath + "/status/" + mi.user + "/" + mi.jobName;
-      try {
-        writeStatus(statusstoredir, jfe);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        throw new YarnException(e);
-      }
       try {
         closeEventWriter(event.getJobID());
       } catch (IOException e) {
@@ -283,27 +305,45 @@ public class JobHistoryEventHandler exte
 
   protected void closeEventWriter(JobId jobId) throws IOException {
     final MetaInfo mi = fileMap.get(jobId);
+    
     try {
-      Path logFile = mi.getHistoryFile();
+      if (mi != null) {
+        mi.closeWriter();
+      }
+     
+      if (mi == null || mi.getHistoryFile() == null) {
+        LOG.info("No file for job-history with " + jobId + " found in cache!");
+      }
+      if (mi.getConfFile() == null) {
+        LOG.info("No file for jobconf with " + jobId + " found in cache!");
+      }
+      
+      
       //TODO fix - add indexed structure 
       // 
-      String doneDir = doneDirPrefixPath + "/" + mi.user + "/";
+      String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath.toString());
       Path doneDirPath =
     	  doneDirFc.makeQualified(new Path(doneDir));
       if (!pathExists(doneDirFc, doneDirPath)) {
-        doneDirFc.mkdir(doneDirPath, new FsPermission(HISTORY_DIR_PERMISSION),
-            true);
+        try {
+          doneDirFc.mkdir(doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+        } catch (FileAlreadyExistsException e) {
+          LOG.info("Done directory: [" + doneDirPath + "] already exists.");
       }
-      // Path localFile = new Path(fromLocalFile);
-      Path qualifiedLogFile =
-    	  logDirFc.makeQualified(logFile);
-      Path qualifiedDoneFile =
-    	  doneDirFc.makeQualified(new Path(doneDirPath, mi.jobName));
-      if (mi != null) {
-        mi.closeWriter();
       }
+      Path logFile = mi.getHistoryFile();
+      Path qualifiedLogFile = logDirFc.makeQualified(logFile);
+      Path qualifiedDoneFile = doneDirFc.makeQualified(new Path(doneDirPath,
+          getDoneJobHistoryFileName(jobId)));
       moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+      
+      Path confFile = mi.getConfFile();
+      Path qualifiedConfFile = logDirFc.makeQualified(confFile);
+      Path qualifiedConfDoneFile = doneDirFc.makeQualified(new Path(doneDirPath, getDoneConfFileName(jobId)));
+      moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
+      
       logDirFc.delete(qualifiedLogFile, true);
+      logDirFc.delete(qualifiedConfFile, true);
     } catch (IOException e) {
       LOG.info("Error closing writer for JobID: " + jobId);
       throw e;
@@ -312,14 +352,16 @@ public class JobHistoryEventHandler exte
 
   private static class MetaInfo {
     private Path historyFile;
+    private Path confFile;
     private EventWriter writer;
     long submitTime;
     String user;
     String jobName;
 
-    MetaInfo(Path historyFile, EventWriter writer, long submitTime,
+    MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
              String user, String jobName) {
       this.historyFile = historyFile;
+      this.confFile = conf;
       this.writer = writer;
       this.submitTime = submitTime;
       this.user = user;
@@ -328,6 +370,8 @@ public class JobHistoryEventHandler exte
 
     Path getHistoryFile() { return historyFile; }
 
+    Path getConfFile() {return confFile; } 
+
     synchronized void closeWriter() throws IOException {
       if (writer != null) {
         writer.close();
@@ -343,18 +387,30 @@ public class JobHistoryEventHandler exte
     }
   }
 
-  /**
-   * Get the job history file path
-   */
-  private Path getJobHistoryFile(Path dir, JobId jobId) {
-    return new Path(dir, TypeConverter.fromYarn(jobId).toString() + "_" + 
-        startCount);
+  //TODO Move some of these functions into a utility class.
+
 
+  private String getDoneJobHistoryFileName(JobId jobId) {
+    return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
   }
 
-/*
- * 
- */
+  private String getDoneConfFileName(JobId jobId) {
+    return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.CONF_FILE_NAME_SUFFIX;
+  }
+  
+  private Path getConfFile(Path logDir, JobId jobId) {
+    Path jobFilePath = null;
+    if (logDir != null) {
+      jobFilePath = new Path(logDir, TypeConverter.fromYarn(jobId).toString()
+          + "_" + startCount + JobHistoryUtils.CONF_FILE_NAME_SUFFIX);
+    }
+    return jobFilePath;
+  }
+
+
+  //TODO This could be done by the jobHistory server - move files to a temporary location
+  //  which is scanned by the JH server - to move them to the final location.
+  // Currently JHEventHandler is moving files to the final location.
   private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
     //check if path exists, in case of retries it may not exist
     if (logDirFc.util().exists(fromPath)) {
@@ -370,7 +426,7 @@ public class JobHistoryEventHandler exte
       else 
           LOG.info("copy failed");
       doneDirFc.setPermission(toPath,
-          new FsPermission(HISTORY_FILE_PERMISSION));
+          new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
     }
   }
 
@@ -382,7 +438,7 @@ public class JobHistoryEventHandler exte
     try {
       Path statusstorepath = doneDirFc.makeQualified(new Path(statusstoredir));
       doneDirFc.mkdir(statusstorepath,
-         new FsPermission(HISTORY_DIR_PERMISSION), true);
+         new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
       Path toPath = new Path(statusstoredir, "jobstats");
       FSDataOutputStream out = doneDirFc.create(toPath, EnumSet
            .of(CreateFlag.CREATE, CreateFlag.OVERWRITE));

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon May  2 06:03:58 2011
@@ -680,7 +680,6 @@ public class JobImpl implements org.apac
 
         org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId 
             attemptID = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
-        //TODO_get.set
         attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
         attemptID.getTaskId().setJobId(job.jobId);
         attemptID.getTaskId().setTaskType(TaskType.MAP);//TODO:fix task type ??
@@ -694,11 +693,12 @@ public class JobImpl implements org.apac
         job.committer = outputFormat.getOutputCommitter(taskContext);
 
         //log to job history
+        //TODO_JH_Validate the values being sent here (along with defaults). Ideally for all JH evnts.
         JobSubmittedEvent jse =
           new JobSubmittedEvent(job.oldJobId, 
               job.conf.get(MRJobConfig.JOB_NAME, "test"), 
               job.conf.get(MRJobConfig.USER_NAME,"mapred"), job.startTime,
-              "test", constructJobACLs(job.conf), 
+              job.remoteJobConfFile.toString(), constructJobACLs(job.conf), 
               job.conf.get(MRJobConfig.QUEUE_NAME,"test"));
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon May  2 06:03:58 2011
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -150,19 +151,21 @@ public class RecoveryService extends Com
   private void parse() throws IOException {
     // TODO: parse history file based on startCount
     String jobName = TypeConverter.fromYarn(appID).toString();
-    String defaultStagingDir = getConfig().get(
-        YARNApplicationConstants.APPS_STAGING_DIR_KEY)
-        + "/history/staging";
-    String jobhistoryDir = getConfig().get(
-        YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
+//    String defaultStagingDir = getConfig().get(
+//        YARNApplicationConstants.APPS_STAGING_DIR_KEY)
+//        + "/history/staging";
+    
+//    String jobhistoryDir = getConfig().get(
+//        YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
+    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(getConfig());
     FSDataInputStream in = null;
     Path historyFile = null;
     Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
         new Path(jobhistoryDir));
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
         getConfig());
-    historyFile = fc.makeQualified(new Path(histDirPath, jobName + "_" + 
-        (startCount -1))); //read the previous history file
+    historyFile = fc.makeQualified(JobHistoryUtils.getJobHistoryFile(
+        histDirPath, jobName, startCount - 1));          //read the previous history file
     in = fc.open(historyFile);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java Mon May  2 06:03:58 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.Ya
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
 import org.junit.Test;
 
 public class TestJobHistoryParsing {
@@ -55,16 +56,18 @@ public class TestJobHistoryParsing {
     app.waitForState(job, JobState.SUCCEEDED);
     app.stop();
     
-    String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString();
+    String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
     String user =
       conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
     String jobhistoryDir = conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
-        "file:///tmp/yarn/done/") + user; 
+        "file:///tmp/yarn/done/"); 
     String jobstatusDir = conf.get(STATUS_STORE_DIR_KEY,
-        "file:///tmp/yarn/done/status/") + user + "/" +
-        jobhistoryFileName;
+        "file:///tmp/yarn/done/status/")  + jobhistoryFileName;
+    
+    String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+    
     FSDataInputStream in = null;
-    Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+    Path historyFilePath = new Path(currentJobHistoryDir, jobhistoryFileName);
     LOG.info("JOBHISTORYDIRE IS " + historyFilePath);
     try {
       FileContext fc = FileContext.getFileContext(historyFilePath.toUri());
@@ -98,22 +101,22 @@ public class TestJobHistoryParsing {
       Assert.assertTrue("total number of task attempts ", 
           taskAttemptCount == 1);
     }
-
-   // Test for checking jobstats for job status store
-    Path statusFilePath = new Path(jobstatusDir, "jobstats");
-    try {
-      FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
-      in = fc.open(statusFilePath);
-    } catch (IOException ioe) {
-      LOG.info("Can not open status file "+ ioe);
-      throw (new Exception("Can not open status File"));
-    }
-    parser = new JobHistoryParser(in);
-    jobInfo = parser.parse();
-    Assert.assertTrue("incorrect finishedMap in job stats file ",
-        jobInfo.getFinishedMaps() == 2);
-    Assert.assertTrue("incorrect finishedReduces in job stats file ",
-        jobInfo.getFinishedReduces() == 1);
+//
+//   // Test for checking jobstats for job status store
+//    Path statusFilePath = new Path(jobstatusDir, "jobstats");
+//    try {
+//      FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
+//      in = fc.open(statusFilePath);
+//    } catch (IOException ioe) {
+//      LOG.info("Can not open status file "+ ioe);
+//      throw (new Exception("Can not open status File"));
+//    }
+//    parser = new JobHistoryParser(in);
+//    jobInfo = parser.parse();
+//    Assert.assertTrue("incorrect finishedMap in job stats file ",
+//        jobInfo.getFinishedMaps() == 2);
+//    Assert.assertTrue("incorrect finishedReduces in job stats file ",
+//        jobInfo.getFinishedReduces() == 1);
   }
 
   public static void main(String[] args) throws Exception {

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java?rev=1098495&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java Mon May  2 06:03:58 2011
@@ -0,0 +1,64 @@
+package org.apache.hadoop.mapreduce.v2.util;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+
+public class JobHistoryUtils {
+  public static final FsPermission HISTORY_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0750); // rwxr-x---
+  
+  public static final FsPermission HISTORY_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0740); // rwxr-----
+  
+  public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+  
+  public static final String JOB_HISTORY_FILE_EXTENSION = ".jhist";
+  
+  public static final int VERSION = 4;
+
+  public static final String LOG_VERSION_STRING = "version-" + VERSION;
+
+  
+  public static String getCurrentDoneDir(String doneDirPrefix) {
+    return doneDirPrefix + File.separator + LOG_VERSION_STRING + File.separator;
+  }
+
+  public static String getConfiguredHistoryLogDirPrefix(Configuration conf) {
+    String defaultLogDir = conf.get(
+        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/staging";
+    String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+      defaultLogDir);
+    return logDir;
+  }
+  
+  public static String getConfiguredHistoryDoneDirPrefix(Configuration conf) {
+    String defaultDoneDir = conf.get(
+        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
+    String  doneDirPrefix =
+      conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+          defaultDoneDir);
+    return doneDirPrefix;
+  }
+  
+  /**
+   * Get the job history file path
+   */
+  public static Path getJobHistoryFile(Path dir, JobId jobId, int attempt) {
+    return getJobHistoryFile(dir, TypeConverter.fromYarn(jobId).toString(), attempt);
+  }
+  
+  /**
+   * Get the job history file path
+   */
+  public static Path getJobHistoryFile(Path dir, String jobId, int attempt) {
+    return new Path(dir, jobId + "_" + 
+        attempt + JOB_HISTORY_FILE_EXTENSION);
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Mon May  2 06:03:58 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -68,8 +70,11 @@ public class CompletedJob implements org
   private TaskAttemptCompletionEvent[] completionEvents;
   private JobInfo jobInfo;
 
-
   public CompletedJob(Configuration conf, JobId jobId) throws IOException {
+    this(conf, jobId, true);
+  }
+
+  public CompletedJob(Configuration conf, JobId jobId, boolean loadTasks) throws IOException {
     this.conf = conf;
     this.jobId = jobId;
     //TODO fix
@@ -95,7 +100,7 @@ public class CompletedJob implements org
     */
     
     //TODO: load the data lazily. for now load the full data upfront
-    loadFullHistoryData();
+    loadFullHistoryData(loadTasks);
 
     counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
     diagnostics.add(jobInfo.getErrorInfo());
@@ -153,7 +158,7 @@ public class CompletedJob implements org
   }
 
   //History data is leisurely loaded when task level data is requested
-  private synchronized void loadFullHistoryData() {
+  private synchronized void loadFullHistoryData(boolean loadTasks) {
     if (jobInfo != null) {
       return; //data already loaded
     }
@@ -162,29 +167,32 @@ public class CompletedJob implements org
       LOG.error("user null is not allowed");
     }
     String jobName = TypeConverter.fromYarn(jobId).toString();
-    String defaultDoneDir = conf.get(
-        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
-    String  jobhistoryDir =
-      conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir)
-        + "/" + user;
+    
+    String  jobhistoryDir = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+      
+    
+    String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+    
     FSDataInputStream in = null;
     Path historyFile = null;
     try {
       Path doneDirPath = FileContext.getFileContext(conf).makeQualified(
-          new Path(jobhistoryDir));
+          new Path(currentJobHistoryDir));
       FileContext fc =
         FileContext.getFileContext(doneDirPath.toUri(),conf);
+      //TODO_JH_There could be multiple instances
+      //TODO_JH_FileName
       historyFile =
-        fc.makeQualified(new Path(doneDirPath, jobName));
+        fc.makeQualified(new Path(doneDirPath, jobName + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
       in = fc.open(historyFile);
       JobHistoryParser parser = new JobHistoryParser(in);
       jobInfo = parser.parse();
-      LOG.info("jobInfo loaded");
     } catch (IOException e) {
       throw new YarnException("Could not load history file " + historyFile,
           e);
     }
     
+    if (loadTasks) {
     // populate the tasks
     for (Map.Entry<org.apache.hadoop.mapreduce.TaskID, TaskInfo> entry : jobInfo
         .getAllTasks().entrySet()) {
@@ -198,9 +206,11 @@ public class CompletedJob implements org
         reduceTasks.put(task.getID(), task);
       }
     }
+    }
     
     // TODO: populate the TaskAttemptCompletionEvent
     completionEvents = new TaskAttemptCompletionEvent[0];
+    LOG.info("TaskInfo loaded");
   }
 
   @Override

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Mon May  2 06:03:58 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
@@ -67,8 +68,13 @@ public class CompletedTaskAttempt implem
 
   @Override
   public ContainerId getAssignedContainerID() {
-    // TODO Auto-generated method stub
-    return null;
+    //TODO ContainerId needs to be part of some historyEvent to be able to render the log directory.
+    ContainerId containerId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerId.class);
+    containerId.setId(-1);
+    containerId.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+    containerId.getAppId().setId(-1);
+    containerId.getAppId().setClusterTimestamp(-1);
+    return containerId;
   }
 
   @Override

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Mon May  2 06:03:58 2011
@@ -19,10 +19,8 @@
 package org.apache.hadoop.mapreduce.v2.hs;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -34,22 +32,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.Clock;
 
 /*
  * Loads and manages the Job history cache.
@@ -109,13 +102,13 @@ public class JobHistory implements Histo
   public Map<JobId, Job> getAllJobs() {
     //currently there is 1 to 1 mapping between app and job id
     Map<JobId, Job> jobs = new HashMap<JobId, Job>();
-    String defaultDoneDir = conf.get(
-        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
-    String  jobhistoryDir =
-      conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir);
+    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+    
+    String currentJobHistoryDoneDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+    
     try {
       Path done = FileContext.getFileContext(conf).makeQualified(
-          new Path(jobhistoryDir));
+          new Path(currentJobHistoryDoneDir));
       FileContext doneDirFc = FileContext.getFileContext(done.toUri(), conf);
       RemoteIterator<LocatedFileStatus> historyFiles = doneDirFc.util()
           .listFiles(done, true);
@@ -124,12 +117,16 @@ public class JobHistory implements Histo
         while (historyFiles.hasNext()) {
           f = historyFiles.next();
           if (f.isDirectory()) continue;
-          String jobName = f.getPath().getName();
+          if (!f.getPath().getName().endsWith(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION)) continue;
+          //TODO_JH_Change to parse the name properly
+          String fileName = f.getPath().getName();
+          String jobName = fileName.substring(0, fileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
+          LOG.info("Processing job: " + jobName);
           org.apache.hadoop.mapreduce.JobID oldJobID = JobID.forName(jobName);
           JobId jobID = TypeConverter.toYarn(oldJobID);
-          Job job = new CompletedJob(conf, jobID);
+          Job job = new CompletedJob(conf, jobID, false);
           jobs.put(jobID, job);
-          completedJobCache.put(jobID, job);
+          // completedJobCache.put(jobID, job);
         }
       }
     } catch (IOException ie) {
@@ -151,6 +148,7 @@ public class JobHistory implements Histo
     return userName;
   }
   
+  
  @Override
  public Clock getClock() {
    return null;