You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/03/24 05:31:18 UTC

svn commit: r1084841 - in /hadoop/mapreduce/branches/MR-279: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/h...

Author: mahadev
Date: Thu Mar 24 04:31:18 2011
New Revision: 1084841

URL: http://svn.apache.org/viewvc?rev=1084841&view=rev
Log:
MAPREDUCE-2403 MR-279: Improve job history event handling in AM to log to HDFS contributed by Krishna Ramachandran

Modified:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
    hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh
    hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh
    hadoop/mapreduce/branches/MR-279/yarn/bin/yarn

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Thu Mar 24 04:31:18 2011
@@ -18,28 +18,30 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
 import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
  * The job history events get routed to this class. This class writes the 
@@ -48,21 +50,23 @@ import org.apache.hadoop.mapreduce.jobhi
  * JobHistory implementation is in this package to access package private 
  * classes.
  */
-public class JobHistoryEventHandler 
+public class JobHistoryEventHandler extends AbstractService
     implements EventHandler<JobHistoryEvent> {
 
   private FileContext logDirFc; // log Dir FileContext
   private FileContext doneDirFc; // done Dir FileContext
-  private Configuration conf;
 
-  private Path logDir = null;
-  private Path done = null; // folder for completed jobs
+  private Path logDirPath = null;
+  private Path doneDirPrefixPath = null; // folder for completed jobs
+
+  private BlockingQueue<JobHistoryEvent> eventQueue =
+    new LinkedBlockingQueue<JobHistoryEvent>();
+  private Thread eventHandlingThread;
+  private volatile boolean stopped;
 
   private static final Log LOG = LogFactory.getLog(
       JobHistoryEventHandler.class);
 
-  private EventWriter eventWriter = null;
-
   private static final Map<JobID, MetaInfo> fileMap =
     Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
 
@@ -72,42 +76,81 @@ public class JobHistoryEventHandler 
   public static final FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0740); // rwxr-----
 
-  public JobHistoryEventHandler(Configuration conf) {
-    this.conf = conf;
-/*
-    String localDir = conf.get("yarn.server.nodemanager.jobhistory",
-        "file:///" +
-        new File(System.getProperty("yarn.log.dir")).getAbsolutePath() +
-        File.separator + "history");
-*/
-    String localDir = conf.get("yarn.server.nodemanager.jobhistory.localdir",
-      "file:///tmp/yarn");
-    logDir = new Path(localDir);
-    String  doneLocation =
-      conf.get("yarn.server.nodemanager.jobhistory",
-      "file:///tmp/yarn/done");
-    if (doneLocation != null) {
-      try {
-        done = FileContext.getFileContext(conf).makeQualified(new Path(doneLocation));
-        doneDirFc = FileContext.getFileContext(done.toUri(), conf);
-        if (!doneDirFc.util().exists(done))
-          doneDirFc.mkdir(done,
-            new FsPermission(HISTORY_DIR_PERMISSION), true);
-        } catch (IOException e) {
+  public JobHistoryEventHandler() {
+    super("JobHistoryEventHandler");
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    String defaultLogDir = conf.get(
+        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/staging";
+    String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+      defaultLogDir);
+    String defaultDoneDir = conf.get(
+        YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
+    String  doneDirPrefix =
+      conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+          defaultDoneDir);
+    try {
+      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(doneDirPrefix));
+      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
+      if (!doneDirFc.util().exists(doneDirPrefixPath)) {
+        doneDirFc.mkdir(doneDirPrefixPath,
+          new FsPermission(HISTORY_DIR_PERMISSION), true);
+      }
+    } catch (IOException e) {
           LOG.info("error creating done directory on dfs " + e);
           throw new YarnException(e);
-      }
     }
     try {
-      logDirFc = FileContext.getFileContext(logDir.toUri(), conf);
-      if (!logDirFc.util().exists(logDir)) {
-        logDirFc.mkdir(logDir, new FsPermission(HISTORY_DIR_PERMISSION), true);
+      logDirPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(logDir));
+      logDirFc = FileContext.getFileContext(logDirPath.toUri(), conf);
+      if (!logDirFc.util().exists(logDirPath)) {
+        logDirFc.mkdir(logDirPath,
+          new FsPermission(HISTORY_DIR_PERMISSION), true);
       }
     } catch (IOException ioe) {
       LOG.info("Mkdirs failed to create " +
-          logDir.toString());
+          logDirPath.toString());
       throw new YarnException(ioe);
     }
+    super.init(conf);
+    start();
+  }
+
+  @Override
+  public void start() {
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        JobHistoryEvent event = null;
+        while (!stopped || !Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+          handleEvent(event);
+        }
+      }
+    });
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    eventHandlingThread.interrupt();
+    try {
+      eventHandlingThread.join();
+    } catch (InterruptedException ie) {
+      LOG.info("Interruped Exception while stopping", ie);
+    }
+    super.stop();
   }
 
   /**
@@ -118,7 +161,7 @@ public class JobHistoryEventHandler 
    */
   protected void setupEventWriter(JobID jobId)
   throws IOException {
-    if (logDir == null) {
+    if (logDirPath == null) {
       throw new IOException("Missing Log Directory for History");
     }
 
@@ -126,9 +169,9 @@ public class JobHistoryEventHandler 
 
     long submitTime = (oldFi == null ? System.currentTimeMillis() : oldFi.submitTime);
 
-    Path logFile = getJobHistoryFile(logDir, jobId);
+    Path logFile = getJobHistoryFile(logDirPath, jobId);
     // String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
-    String user = conf.get(MRJobConfig.USER_NAME);
+    String user = getConfig().get(MRJobConfig.USER_NAME);
     if (user == null) {
       throw new IOException("User is null while setting up jobhistory eventwriter" );
     }
@@ -145,7 +188,6 @@ public class JobHistoryEventHandler 
         throw ioe;
       }
     }
-    this.eventWriter = writer;
     /*TODO Storing the job conf on the log dir if required*/
     MetaInfo fi = new MetaInfo(logFile, writer, submitTime, user, jobName);
     fileMap.put(jobId, fi);
@@ -165,7 +207,16 @@ public class JobHistoryEventHandler 
     }
   }
 
-  public synchronized void handle(JobHistoryEvent event) {
+  @Override
+  public void handle(JobHistoryEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  protected void handleEvent(JobHistoryEvent event) {
     // check for first event from a job
     if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
       try {
@@ -180,6 +231,7 @@ public class JobHistoryEventHandler 
     try {
       HistoryEvent historyEvent = event.getHistoryEvent();
       mi.writeEvent(historyEvent);
+      LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType());
     } catch (IOException e) {
       LOG.error("in handler ioException " + e);
       throw new YarnException(e);
@@ -187,7 +239,7 @@ public class JobHistoryEventHandler 
     // check for done
     if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
       JobFinishedEvent jfe = (JobFinishedEvent) event.getHistoryEvent();
-      String statusstoredir = done + "/status/" + mi.user + "/" + mi.jobName;
+      String statusstoredir = doneDirPrefixPath + "/status/" + mi.user + "/" + mi.jobName;
       try {
         writeStatus(statusstoredir, jfe);
       } catch (IOException e) {
@@ -205,23 +257,26 @@ public class JobHistoryEventHandler 
   protected void closeEventWriter(JobID jobId) throws IOException {
     final MetaInfo mi = fileMap.get(jobId);
     try {
-      Path fromLocalFile = mi.getHistoryFile();
-      // Path toPath = new Path(done, mi.jobName);
-      String jobhistorydir = done + "/" + mi.user + "/";
-      Path jobhistorydirpath =
-    	  logDirFc.makeQualified(new Path(jobhistorydir));
-      logDirFc.mkdir(jobhistorydirpath,
-         new FsPermission(HISTORY_DIR_PERMISSION), true);
+      Path logFile = mi.getHistoryFile();
+      //TODO fix - add indexed structure 
+      // 
+      String doneDir = doneDirPrefixPath + "/" + mi.user + "/";
+      Path doneDirPath =
+    	  doneDirFc.makeQualified(new Path(doneDir));
+      if (!pathExists(doneDirFc, doneDirPath)) {
+        doneDirFc.mkdir(doneDirPath, new FsPermission(HISTORY_DIR_PERMISSION),
+            true);
+      }
       // Path localFile = new Path(fromLocalFile);
-      Path localQualifiedFile =
-    	  logDirFc.makeQualified(fromLocalFile);
-      Path jobHistoryFile =
-    	  logDirFc.makeQualified(new Path(jobhistorydirpath, mi.jobName));
+      Path qualifiedLogFile =
+    	  logDirFc.makeQualified(logFile);
+      Path qualifiedDoneFile =
+    	  doneDirFc.makeQualified(new Path(doneDirPath, mi.jobName));
       if (mi != null) {
         mi.closeWriter();
       }
-      moveToDoneNow(localQualifiedFile, jobHistoryFile);
-      logDirFc.delete(localQualifiedFile, true);
+      moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+      logDirFc.delete(qualifiedLogFile, true);
     } catch (IOException e) {
       LOG.info("Error closing writer for JobID: " + jobId);
       throw e;
@@ -287,7 +342,11 @@ public class JobHistoryEventHandler 
       doneDirFc.setPermission(toPath,
           new FsPermission(HISTORY_FILE_PERMISSION));
     }
-  }  
+  }
+
+  boolean pathExists(FileContext fc, Path path) throws IOException {
+    return fc.util().exists(path);
+  }
 
   private void writeStatus(String statusstoredir, HistoryEvent event) throws IOException {
     try {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Mar 24 04:31:18 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -63,6 +64,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -71,8 +73,6 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
 
 /**
  * The Map-Reduce Application Master.
@@ -150,6 +150,10 @@ public class MRAppMaster extends Composi
     containerAllocator = createContainerAllocator(clientService, context);
     addIfService(containerAllocator);
 
+    //service to log job history events
+    EventHandler<JobHistoryEvent> historyService = 
+      createJobHistoryHandler(conf);
+
     //register the event dispatchers
     dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
     dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
@@ -158,6 +162,8 @@ public class MRAppMaster extends Composi
     dispatcher.register(TaskAttemptEventType.class, 
         new TaskAttemptEventDispatcher());
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        historyService);
     
     if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
         || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
@@ -189,13 +195,9 @@ public class MRAppMaster extends Composi
 
   protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
       Configuration conf) {
-    return new EventHandler<JobHistoryEvent>() {
-      @Override
-      public void handle(JobHistoryEvent event) {
-      }
-    };
-    //TODO use the real job history handler.
-    //return new JobHistoryEventHandler(conf);
+    JobHistoryEventHandler eventHandler = new JobHistoryEventHandler();
+    eventHandler.init(conf);
+    return eventHandler;
   }
 
   protected Speculator createSpeculator(Configuration conf, AppContext context) {
@@ -366,8 +368,6 @@ public class MRAppMaster extends Composi
             taskAttemptListener, jobTokenSecretManager, fsTokens);
     ((RunningAppContext) context).jobs.put(job.getID(), job);
 
-    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
-        createJobHistoryHandler(config));
     dispatcher.register(JobFinishEvent.Type.class,
         new EventHandler<JobFinishEvent>() {
           @Override

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Mar 24 04:31:18 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.WrappedJ
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -87,6 +88,10 @@ public class MRApp extends MRAppMaster {
   public Job submit(Configuration conf) throws Exception {
     String user = conf.get(MRJobConfig.USER_NAME, "mapred");
     conf.set(MRJobConfig.USER_NAME, user);
+    conf.set(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+       "file:///tmp/yarn/");
+    conf.set(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+       "file:///tmp/yarn/done/");
     init(conf);
     start();
     Job job = getContext().getAllJobs().values().iterator().next();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java Thu Mar 24 04:31:18 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -43,11 +44,13 @@ import org.junit.Test;
 
 public class TestJobHistoryParsing {
   private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
-
+  //TODO FIX once final CompletedStatusStore is available
+  private static final String STATUS_STORE_DIR_KEY =
+    "yarn.server.nodemanager.jobstatus";
   @Test
   public void testHistoryParsing() throws Exception {
     Configuration conf = new Configuration();
-    MRApp app = new HistoryEnabledApp(2, 1, true);
+    MRApp app = new MRApp(2, 1, true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
     JobID jobId = job.getID();
@@ -57,9 +60,9 @@ public class TestJobHistoryParsing {
     String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString();
     String user =
       conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
-    String jobhistoryDir = conf.get("yarn.server.nodemanager.jobhistory",
+    String jobhistoryDir = conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
         "file:///tmp/yarn/done/") + user; 
-    String jobstatusDir = conf.get("yarn.server.nodemanager.jobhistory",
+    String jobstatusDir = conf.get(STATUS_STORE_DIR_KEY,
         "file:///tmp/yarn/done/status/") + user + "/" +
         jobhistoryFileName;
     FSDataInputStream in = null;
@@ -123,10 +126,9 @@ public class TestJobHistoryParsing {
     @Override
     protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
         Configuration conf) {
-      return new JobHistoryEventHandler(conf);
+      return new JobHistoryEventHandler();
     }
   }
-
   public static void main(String[] args) throws Exception {
     TestJobHistoryParsing t = new TestJobHistoryParsing();
     t.testHistoryParsing();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Thu Mar 24 04:31:18 2011
@@ -35,4 +35,16 @@ public class YarnMRJobConfig {
       = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.lambda";
   public static final String EXPONENTIAL_SMOOTHING_SMOOTH_RATE
       = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.smoothsrate";
+  public static final String HS_PREFIX = "yarn.server.historyserver.";
+
+  public static final String DEFAULT_HS_BIND_ADDRESS = "0.0.0.0:10020";
+
+  /** host:port address to which to bind to **/
+  public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
+
+  public static final String HISTORY_STAGING_DIR_KEY =
+       "yarn.historyfile.stagingDir";
+
+  public static final String HISTORY_DONE_DIR_KEY =
+       "yarn.historyfile.doneDir";
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Thu Mar 24 04:31:18 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.TaskID;
 import org.apache.hadoop.mapreduce.v2.api.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 
 /**
  * This module is responsible for talking to the 
@@ -75,8 +76,8 @@ public class HistoryClientService extend
   public void start() {
     Configuration conf = new Configuration(getConfig());
     YarnRPC rpc = YarnRPC.create(conf);
-    String serviceAddr = conf.get("jobhistory.server.hostname") + ":"
-        + conf.get("jobhistory.server.port");
+    String serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
+        YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
     InetSocketAddress address = NetUtils.createSocketAddr(serviceAddr);
     InetAddress hostNameResolved = null;
     try {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Thu Mar 24 04:31:18 2011
@@ -27,11 +27,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
-import org.apache.hadoop.mapreduce.v2.app.MRApp;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.JobID;
 import org.apache.hadoop.mapreduce.v2.api.JobState;
 import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
@@ -39,6 +35,11 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.TaskID;
 import org.apache.hadoop.mapreduce.v2.api.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
@@ -48,8 +49,8 @@ public class TestJobHistoryEvents {
   @Test
   public void testHistoryEvents() throws Exception {
     Configuration conf = new Configuration();
-    conf.set("mapreduce.job.user.name", "test");
-    MRApp app = new HistoryEnabledApp(2, 1, true);
+    conf.set(MRJobConfig.USER_NAME, "test");
+    MRApp app = new MRApp(2, 1, true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
     JobID jobId = job.getID();
@@ -108,7 +109,7 @@ public class TestJobHistoryEvents {
     @Override
     protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
         Configuration conf) {
-      return new JobHistoryEventHandler(conf);
+      return new JobHistoryEventHandler();
     }
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 24 04:31:18 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.YarnRemote
 import org.apache.hadoop.mapreduce.v2.api.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 
 public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@@ -71,8 +72,8 @@ public class ClientServiceDelegate {
   private void refreshProxy() throws AvroRemoteException {
     ApplicationMaster appMaster = rm.getApplicationMaster(appId);
     if (ApplicationState.COMPLETED.equals(appMaster.state)) {
-      serviceAddr = conf.get("jobhistory.server.hostname") + ":"
-      + conf.get("jobhistory.server.port");
+      String serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
+          YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
       LOG.debug("Reconnecting to job history server " + serviceAddr);
     } else {
       /* TODO check to confirm its really launched */

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Mar 24 04:31:18 2011
@@ -56,12 +56,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.junit.Test;
 
 public class TestClientRedirect {
 
   private static final Log LOG = LogFactory.getLog(TestClientRedirect.class);
   private static final String RMADDRESS = "0.0.0.0:8054";
+  private static final String AMHOSTADDRESS = "0.0.0.0:10020";
   private static final String AMHOSTNAME = "0.0.0.0";
   private static final int AMPORT = 10020;
   private boolean firstRedirect = false; 
@@ -72,8 +74,7 @@ public class TestClientRedirect {
     
     Configuration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
-    conf.set("jobhistory.server.hostname", AMHOSTNAME);
-    conf.setInt("jobhistory.server.port", AMPORT);
+    conf.set(YarnMRJobConfig.HS_BIND_ADDRESS, AMHOSTADDRESS);
     RMService rmService = new RMService("test");
     rmService.init(conf);
     rmService.start();
@@ -182,7 +183,7 @@ public class TestClientRedirect {
     public void start(Configuration conf) {
       YarnRPC rpc = YarnRPC.create(conf);
       //TODO : use fixed port ??
-      InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTNAME + ":" + AMPORT);
+      InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTADDRESS);
       InetAddress hostNameResolved = null;
       try {
         address.getAddress();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Mar 24 04:31:18 2011
@@ -70,7 +70,10 @@ public class MiniMRYarnCluster extends M
         "apps_staging_dir/${user.name}/").getAbsolutePath());
     conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                              // which shuffle doesn't happen
-    
+    conf.set(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+        "file:///tmp/yarn/");
+    conf.set(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+        "file:///tmp/yarn/done/");
     //configure the shuffle service in NM
     conf.setStrings(AuxServices.AUX_SERVICES,
         new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });

Modified: hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh Thu Mar 24 04:31:18 2011
@@ -28,3 +28,5 @@ bin=`cd "$bin"; pwd`
 "$bin"/yarn-daemon.sh --config $YARN_CONF_DIR  start resourcemanager
 # start nodeManager
 "$bin"/yarn-daemons.sh --config $YARN_CONF_DIR  start nodemanager
+# start historyserver
+#"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR  start historyserver

Modified: hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh Thu Mar 24 04:31:18 2011
@@ -28,3 +28,6 @@ bin=`cd "$bin"; pwd`
 "$bin"/yarn-daemon.sh --config $YARN_CONF_DIR  stop resourcemanager
 # stop nodeManager
 "$bin"/yarn-daemons.sh --config $YARN_CONF_DIR  stop nodemanager
+# stop historyServer
+"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR  stop historyserver
+

Modified: hadoop/mapreduce/branches/MR-279/yarn/bin/yarn
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/bin/yarn?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/bin/yarn (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/bin/yarn Thu Mar 24 04:31:18 2011
@@ -294,7 +294,7 @@ elif [ "$COMMAND" = "nodemanager" ] ; th
     YARN_OPTS="$YARN_OPTS -server $YARN_NODEMANAGER_OPTS"
   fi
 elif [ "$COMMAND" = "historyserver" ] ; then
-  CLASS=org.apache.hadoop.yarn.mapreduce.hs.JobHistoryServer
+  CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
   YARN_OPTS="$YARN_OPTS $YARN_JOB_HISTORYSERVER_OPTS"
 elif [ "$COMMAND" = "job" ] ; then
   CLASS=org.apache.hadoop.mapred.JobClient