You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/03/24 05:31:18 UTC
svn commit: r1084841 - in /hadoop/mapreduce/branches/MR-279:
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
mr-client/h...
Author: mahadev
Date: Thu Mar 24 04:31:18 2011
New Revision: 1084841
URL: http://svn.apache.org/viewvc?rev=1084841&view=rev
Log:
MAPREDUCE-2403 MR-279: Improve job history event handling in AM to log to HDFS contributed by Krishna Ramachandran
Modified:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh
hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh
hadoop/mapreduce/branches/MR-279/yarn/bin/yarn
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Thu Mar 24 04:31:18 2011
@@ -18,28 +18,30 @@
package org.apache.hadoop.mapreduce.jobhistory;
-import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.yarn.service.AbstractService;
/**
* The job history events get routed to this class. This class writes the
@@ -48,21 +50,23 @@ import org.apache.hadoop.mapreduce.jobhi
* JobHistory implementation is in this package to access package private
* classes.
*/
-public class JobHistoryEventHandler
+public class JobHistoryEventHandler extends AbstractService
implements EventHandler<JobHistoryEvent> {
private FileContext logDirFc; // log Dir FileContext
private FileContext doneDirFc; // done Dir FileContext
- private Configuration conf;
- private Path logDir = null;
- private Path done = null; // folder for completed jobs
+ private Path logDirPath = null;
+ private Path doneDirPrefixPath = null; // folder for completed jobs
+
+ private BlockingQueue<JobHistoryEvent> eventQueue =
+ new LinkedBlockingQueue<JobHistoryEvent>();
+ private Thread eventHandlingThread;
+ private volatile boolean stopped;
private static final Log LOG = LogFactory.getLog(
JobHistoryEventHandler.class);
- private EventWriter eventWriter = null;
-
private static final Map<JobID, MetaInfo> fileMap =
Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
@@ -72,42 +76,81 @@ public class JobHistoryEventHandler
public static final FsPermission HISTORY_FILE_PERMISSION =
FsPermission.createImmutable((short) 0740); // rwxr-----
- public JobHistoryEventHandler(Configuration conf) {
- this.conf = conf;
-/*
- String localDir = conf.get("yarn.server.nodemanager.jobhistory",
- "file:///" +
- new File(System.getProperty("yarn.log.dir")).getAbsolutePath() +
- File.separator + "history");
-*/
- String localDir = conf.get("yarn.server.nodemanager.jobhistory.localdir",
- "file:///tmp/yarn");
- logDir = new Path(localDir);
- String doneLocation =
- conf.get("yarn.server.nodemanager.jobhistory",
- "file:///tmp/yarn/done");
- if (doneLocation != null) {
- try {
- done = FileContext.getFileContext(conf).makeQualified(new Path(doneLocation));
- doneDirFc = FileContext.getFileContext(done.toUri(), conf);
- if (!doneDirFc.util().exists(done))
- doneDirFc.mkdir(done,
- new FsPermission(HISTORY_DIR_PERMISSION), true);
- } catch (IOException e) {
+ public JobHistoryEventHandler() {
+ super("JobHistoryEventHandler");
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ String defaultLogDir = conf.get(
+ YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/staging";
+ String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+ defaultLogDir);
+ String defaultDoneDir = conf.get(
+ YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
+ String doneDirPrefix =
+ conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+ defaultDoneDir);
+ try {
+ doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
+ new Path(doneDirPrefix));
+ doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
+ if (!doneDirFc.util().exists(doneDirPrefixPath)) {
+ doneDirFc.mkdir(doneDirPrefixPath,
+ new FsPermission(HISTORY_DIR_PERMISSION), true);
+ }
+ } catch (IOException e) {
LOG.info("error creating done directory on dfs " + e);
throw new YarnException(e);
- }
}
try {
- logDirFc = FileContext.getFileContext(logDir.toUri(), conf);
- if (!logDirFc.util().exists(logDir)) {
- logDirFc.mkdir(logDir, new FsPermission(HISTORY_DIR_PERMISSION), true);
+ logDirPath = FileContext.getFileContext(conf).makeQualified(
+ new Path(logDir));
+ logDirFc = FileContext.getFileContext(logDirPath.toUri(), conf);
+ if (!logDirFc.util().exists(logDirPath)) {
+ logDirFc.mkdir(logDirPath,
+ new FsPermission(HISTORY_DIR_PERMISSION), true);
}
} catch (IOException ioe) {
LOG.info("Mkdirs failed to create " +
- logDir.toString());
+ logDirPath.toString());
throw new YarnException(ioe);
}
+ super.init(conf);
+ start();
+ }
+
+ @Override
+ public void start() {
+ eventHandlingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ JobHistoryEvent event = null;
+ while (!stopped || !Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ return;
+ }
+ handleEvent(event);
+ }
+ }
+ });
+ eventHandlingThread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ eventHandlingThread.interrupt();
+ try {
+ eventHandlingThread.join();
+ } catch (InterruptedException ie) {
+ LOG.info("Interruped Exception while stopping", ie);
+ }
+ super.stop();
}
/**
@@ -118,7 +161,7 @@ public class JobHistoryEventHandler
*/
protected void setupEventWriter(JobID jobId)
throws IOException {
- if (logDir == null) {
+ if (logDirPath == null) {
throw new IOException("Missing Log Directory for History");
}
@@ -126,9 +169,9 @@ public class JobHistoryEventHandler
long submitTime = (oldFi == null ? System.currentTimeMillis() : oldFi.submitTime);
- Path logFile = getJobHistoryFile(logDir, jobId);
+ Path logFile = getJobHistoryFile(logDirPath, jobId);
// String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
- String user = conf.get(MRJobConfig.USER_NAME);
+ String user = getConfig().get(MRJobConfig.USER_NAME);
if (user == null) {
throw new IOException("User is null while setting up jobhistory eventwriter" );
}
@@ -145,7 +188,6 @@ public class JobHistoryEventHandler
throw ioe;
}
}
- this.eventWriter = writer;
/*TODO Storing the job conf on the log dir if required*/
MetaInfo fi = new MetaInfo(logFile, writer, submitTime, user, jobName);
fileMap.put(jobId, fi);
@@ -165,7 +207,16 @@ public class JobHistoryEventHandler
}
}
- public synchronized void handle(JobHistoryEvent event) {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ protected void handleEvent(JobHistoryEvent event) {
// check for first event from a job
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
try {
@@ -180,6 +231,7 @@ public class JobHistoryEventHandler
try {
HistoryEvent historyEvent = event.getHistoryEvent();
mi.writeEvent(historyEvent);
+ LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType());
} catch (IOException e) {
LOG.error("in handler ioException " + e);
throw new YarnException(e);
@@ -187,7 +239,7 @@ public class JobHistoryEventHandler
// check for done
if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
JobFinishedEvent jfe = (JobFinishedEvent) event.getHistoryEvent();
- String statusstoredir = done + "/status/" + mi.user + "/" + mi.jobName;
+ String statusstoredir = doneDirPrefixPath + "/status/" + mi.user + "/" + mi.jobName;
try {
writeStatus(statusstoredir, jfe);
} catch (IOException e) {
@@ -205,23 +257,26 @@ public class JobHistoryEventHandler
protected void closeEventWriter(JobID jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
try {
- Path fromLocalFile = mi.getHistoryFile();
- // Path toPath = new Path(done, mi.jobName);
- String jobhistorydir = done + "/" + mi.user + "/";
- Path jobhistorydirpath =
- logDirFc.makeQualified(new Path(jobhistorydir));
- logDirFc.mkdir(jobhistorydirpath,
- new FsPermission(HISTORY_DIR_PERMISSION), true);
+ Path logFile = mi.getHistoryFile();
+ //TODO fix - add indexed structure
+ //
+ String doneDir = doneDirPrefixPath + "/" + mi.user + "/";
+ Path doneDirPath =
+ doneDirFc.makeQualified(new Path(doneDir));
+ if (!pathExists(doneDirFc, doneDirPath)) {
+ doneDirFc.mkdir(doneDirPath, new FsPermission(HISTORY_DIR_PERMISSION),
+ true);
+ }
// Path localFile = new Path(fromLocalFile);
- Path localQualifiedFile =
- logDirFc.makeQualified(fromLocalFile);
- Path jobHistoryFile =
- logDirFc.makeQualified(new Path(jobhistorydirpath, mi.jobName));
+ Path qualifiedLogFile =
+ logDirFc.makeQualified(logFile);
+ Path qualifiedDoneFile =
+ doneDirFc.makeQualified(new Path(doneDirPath, mi.jobName));
if (mi != null) {
mi.closeWriter();
}
- moveToDoneNow(localQualifiedFile, jobHistoryFile);
- logDirFc.delete(localQualifiedFile, true);
+ moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+ logDirFc.delete(qualifiedLogFile, true);
} catch (IOException e) {
LOG.info("Error closing writer for JobID: " + jobId);
throw e;
@@ -287,7 +342,11 @@ public class JobHistoryEventHandler
doneDirFc.setPermission(toPath,
new FsPermission(HISTORY_FILE_PERMISSION));
}
- }
+ }
+
+ boolean pathExists(FileContext fc, Path path) throws IOException {
+ return fc.util().exists(path);
+ }
private void writeStatus(String statusstoredir, HistoryEvent event) throws IOException {
try {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Mar 24 04:31:18 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -63,6 +64,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.ApplicationID;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -71,8 +73,6 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
/**
* The Map-Reduce Application Master.
@@ -150,6 +150,10 @@ public class MRAppMaster extends Composi
containerAllocator = createContainerAllocator(clientService, context);
addIfService(containerAllocator);
+ //service to log job history events
+ EventHandler<JobHistoryEvent> historyService =
+ createJobHistoryHandler(conf);
+
//register the event dispatchers
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
@@ -158,6 +162,8 @@ public class MRAppMaster extends Composi
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ historyService);
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
@@ -189,13 +195,9 @@ public class MRAppMaster extends Composi
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
Configuration conf) {
- return new EventHandler<JobHistoryEvent>() {
- @Override
- public void handle(JobHistoryEvent event) {
- }
- };
- //TODO use the real job history handler.
- //return new JobHistoryEventHandler(conf);
+ JobHistoryEventHandler eventHandler = new JobHistoryEventHandler();
+ eventHandler.init(conf);
+ return eventHandler;
}
protected Speculator createSpeculator(Configuration conf, AppContext context) {
@@ -366,8 +368,6 @@ public class MRAppMaster extends Composi
taskAttemptListener, jobTokenSecretManager, fsTokens);
((RunningAppContext) context).jobs.put(job.getID(), job);
- dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
- createJobHistoryHandler(config));
dispatcher.register(JobFinishEvent.Type.class,
new EventHandler<JobFinishEvent>() {
@Override
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Mar 24 04:31:18 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.WrappedJ
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -87,6 +88,10 @@ public class MRApp extends MRAppMaster {
public Job submit(Configuration conf) throws Exception {
String user = conf.get(MRJobConfig.USER_NAME, "mapred");
conf.set(MRJobConfig.USER_NAME, user);
+ conf.set(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+ "file:///tmp/yarn/");
+ conf.set(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+ "file:///tmp/yarn/done/");
init(conf);
start();
Job job = getContext().getAllJobs().values().iterator().next();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java Thu Mar 24 04:31:18 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -43,11 +44,13 @@ import org.junit.Test;
public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
-
+ //TODO FIX once final CompletedStatusStore is available
+ private static final String STATUS_STORE_DIR_KEY =
+ "yarn.server.nodemanager.jobstatus";
@Test
public void testHistoryParsing() throws Exception {
Configuration conf = new Configuration();
- MRApp app = new HistoryEnabledApp(2, 1, true);
+ MRApp app = new MRApp(2, 1, true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobID jobId = job.getID();
@@ -57,9 +60,9 @@ public class TestJobHistoryParsing {
String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString();
String user =
conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
- String jobhistoryDir = conf.get("yarn.server.nodemanager.jobhistory",
+ String jobhistoryDir = conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
"file:///tmp/yarn/done/") + user;
- String jobstatusDir = conf.get("yarn.server.nodemanager.jobhistory",
+ String jobstatusDir = conf.get(STATUS_STORE_DIR_KEY,
"file:///tmp/yarn/done/status/") + user + "/" +
jobhistoryFileName;
FSDataInputStream in = null;
@@ -123,10 +126,9 @@ public class TestJobHistoryParsing {
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
Configuration conf) {
- return new JobHistoryEventHandler(conf);
+ return new JobHistoryEventHandler();
}
}
-
public static void main(String[] args) throws Exception {
TestJobHistoryParsing t = new TestJobHistoryParsing();
t.testHistoryParsing();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Thu Mar 24 04:31:18 2011
@@ -35,4 +35,16 @@ public class YarnMRJobConfig {
= "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.lambda";
public static final String EXPONENTIAL_SMOOTHING_SMOOTH_RATE
= "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.smoothsrate";
+ public static final String HS_PREFIX = "yarn.server.historyserver.";
+
+ public static final String DEFAULT_HS_BIND_ADDRESS = "0.0.0.0:10020";
+
+ /** host:port address to which to bind to **/
+ public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
+
+ public static final String HISTORY_STAGING_DIR_KEY =
+ "yarn.historyfile.stagingDir";
+
+ public static final String HISTORY_DONE_DIR_KEY =
+ "yarn.historyfile.doneDir";
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Thu Mar 24 04:31:18 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.TaskID;
import org.apache.hadoop.mapreduce.v2.api.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
/**
* This module is responsible for talking to the
@@ -75,8 +76,8 @@ public class HistoryClientService extend
public void start() {
Configuration conf = new Configuration(getConfig());
YarnRPC rpc = YarnRPC.create(conf);
- String serviceAddr = conf.get("jobhistory.server.hostname") + ":"
- + conf.get("jobhistory.server.port");
+ String serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
+ YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
InetSocketAddress address = NetUtils.createSocketAddr(serviceAddr);
InetAddress hostNameResolved = null;
try {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Thu Mar 24 04:31:18 2011
@@ -27,11 +27,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
-import org.apache.hadoop.mapreduce.v2.app.MRApp;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.JobID;
import org.apache.hadoop.mapreduce.v2.api.JobState;
import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
@@ -39,6 +35,11 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.TaskID;
import org.apache.hadoop.mapreduce.v2.api.TaskState;
import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@@ -48,8 +49,8 @@ public class TestJobHistoryEvents {
@Test
public void testHistoryEvents() throws Exception {
Configuration conf = new Configuration();
- conf.set("mapreduce.job.user.name", "test");
- MRApp app = new HistoryEnabledApp(2, 1, true);
+ conf.set(MRJobConfig.USER_NAME, "test");
+ MRApp app = new MRApp(2, 1, true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobID jobId = job.getID();
@@ -108,7 +109,7 @@ public class TestJobHistoryEvents {
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
Configuration conf) {
- return new JobHistoryEventHandler(conf);
+ return new JobHistoryEventHandler();
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 24 04:31:18 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.YarnRemote
import org.apache.hadoop.mapreduce.v2.api.JobReport;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@@ -71,8 +72,8 @@ public class ClientServiceDelegate {
private void refreshProxy() throws AvroRemoteException {
ApplicationMaster appMaster = rm.getApplicationMaster(appId);
if (ApplicationState.COMPLETED.equals(appMaster.state)) {
- serviceAddr = conf.get("jobhistory.server.hostname") + ":"
- + conf.get("jobhistory.server.port");
+ String serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
+ YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
LOG.debug("Reconnecting to job history server " + serviceAddr);
} else {
/* TODO check to confirm its really launched */
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Mar 24 04:31:18 2011
@@ -56,12 +56,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.junit.Test;
public class TestClientRedirect {
private static final Log LOG = LogFactory.getLog(TestClientRedirect.class);
private static final String RMADDRESS = "0.0.0.0:8054";
+ private static final String AMHOSTADDRESS = "0.0.0.0:10020";
private static final String AMHOSTNAME = "0.0.0.0";
private static final int AMPORT = 10020;
private boolean firstRedirect = false;
@@ -72,8 +74,7 @@ public class TestClientRedirect {
Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
- conf.set("jobhistory.server.hostname", AMHOSTNAME);
- conf.setInt("jobhistory.server.port", AMPORT);
+ conf.set(YarnMRJobConfig.HS_BIND_ADDRESS, AMHOSTADDRESS);
RMService rmService = new RMService("test");
rmService.init(conf);
rmService.start();
@@ -182,7 +183,7 @@ public class TestClientRedirect {
public void start(Configuration conf) {
YarnRPC rpc = YarnRPC.create(conf);
//TODO : use fixed port ??
- InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTNAME + ":" + AMPORT);
+ InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTADDRESS);
InetAddress hostNameResolved = null;
try {
address.getAddress();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Mar 24 04:31:18 2011
@@ -70,7 +70,10 @@ public class MiniMRYarnCluster extends M
"apps_staging_dir/${user.name}/").getAbsolutePath());
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
-
+ conf.set(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+ "file:///tmp/yarn/");
+ conf.set(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+ "file:///tmp/yarn/done/");
//configure the shuffle service in NM
conf.setStrings(AuxServices.AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
Modified: hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/bin/start-all.sh Thu Mar 24 04:31:18 2011
@@ -28,3 +28,5 @@ bin=`cd "$bin"; pwd`
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start resourcemanager
# start nodeManager
"$bin"/yarn-daemons.sh --config $YARN_CONF_DIR start nodemanager
+# start historyserver
+#"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start historyserver
Modified: hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/bin/stop-all.sh Thu Mar 24 04:31:18 2011
@@ -28,3 +28,6 @@ bin=`cd "$bin"; pwd`
"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop resourcemanager
# stop nodeManager
"$bin"/yarn-daemons.sh --config $YARN_CONF_DIR stop nodemanager
+# stop historyServer
+"$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop historyserver
+
Modified: hadoop/mapreduce/branches/MR-279/yarn/bin/yarn
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/bin/yarn?rev=1084841&r1=1084840&r2=1084841&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/bin/yarn (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/bin/yarn Thu Mar 24 04:31:18 2011
@@ -294,7 +294,7 @@ elif [ "$COMMAND" = "nodemanager" ] ; th
YARN_OPTS="$YARN_OPTS -server $YARN_NODEMANAGER_OPTS"
fi
elif [ "$COMMAND" = "historyserver" ] ; then
- CLASS=org.apache.hadoop.yarn.mapreduce.hs.JobHistoryServer
+ CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
YARN_OPTS="$YARN_OPTS $YARN_JOB_HISTORYSERVER_OPTS"
elif [ "$COMMAND" = "job" ] ; then
CLASS=org.apache.hadoop.mapred.JobClient