You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC
svn commit: r1196458 [3/19] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/
bin/ conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-cli...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Nov 2 05:34:31 2011
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,17 +33,27 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -70,17 +82,20 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -108,19 +123,27 @@ import org.apache.hadoop.yarn.util.Conve
* The information is shared across different components using AppContext.
*/
+@SuppressWarnings("deprecation")
public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
private Clock clock;
- private final long startTime = System.currentTimeMillis();
+ private final long startTime;
+ private final long appSubmitTime;
private String appName;
private final ApplicationAttemptId appAttemptID;
+ private final ContainerId containerID;
+ private final String nmHost;
+ private final int nmPort;
+ private final int nmHttpPort;
protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
+ private List<AMInfo> amInfos;
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
+ private Recovery recoveryServ;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
@@ -128,39 +151,83 @@ public class MRAppMaster extends Composi
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
+ private JobId jobId;
+ private boolean newApiCommitter;
+ private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
+ private boolean inRecovery = false;
private Job job;
-
- public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
- this(applicationAttemptId, new SystemClock());
+ private Credentials fsTokens = new Credentials(); // Filled during init
+ private UserGroupInformation currentUser; // Will be setup during init
+
+ public MRAppMaster(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+ long appSubmitTime) {
+ this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
+ new SystemClock(), appSubmitTime);
}
- public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) {
+ public MRAppMaster(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+ Clock clock, long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
+ this.startTime = clock.getTime();
+ this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
+ this.containerID = containerId;
+ this.nmHost = nmHost;
+ this.nmPort = nmPort;
+ this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@Override
public void init(final Configuration conf) {
+
+ downloadTokensAndSetupUGI(conf);
+
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
- if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
- && appAttemptID.getAttemptId() > 1) {
- LOG.info("Recovery is enabled. Will try to recover from previous life.");
- Recovery recoveryServ = new RecoveryService(appAttemptID, clock);
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+
+ newApiCommitter = false;
+ jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
+ appAttemptID.getApplicationId().getId());
+ int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ if ((numReduceTasks > 0 &&
+ conf.getBoolean("mapred.reducer.new-api", false)) ||
+ (numReduceTasks == 0 &&
+ conf.getBoolean("mapred.mapper.new-api", false))) {
+ newApiCommitter = true;
+ LOG.info("Using mapred newApiCommitter.");
+ }
+
+ committer = createOutputCommitter(conf);
+ boolean recoveryEnabled = conf.getBoolean(
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+ if (recoveryEnabled && recoverySupportedByCommitter
+ && appAttemptID.getAttemptId() > 1) {
+ LOG.info("Recovery is enabled. "
+ + "Will try to recover from previous life on best effort basis.");
+ recoveryServ = new RecoveryService(appAttemptID, clock,
+ committer);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
- completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+ inRecovery = true;
} else {
+ LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ + recoveryEnabled + " recoverySupportedByCommitter: "
+ + recoverySupportedByCommitter + " ApplicationAttemptID: "
+ + appAttemptID.getAttemptId());
dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
}
@@ -223,15 +290,165 @@ public class MRAppMaster extends Composi
super.init(conf);
} // end of init()
+ private OutputCommitter createOutputCommitter(Configuration conf) {
+ OutputCommitter committer = null;
+
+ LOG.info("OutputCommitter set in config "
+ + conf.get("mapred.output.committer.class"));
+
+ if (newApiCommitter) {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
+ .newTaskId(jobId, 0, TaskType.MAP);
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
+ .newTaskAttemptId(taskID, 0);
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptID));
+ OutputFormat outputFormat;
+ try {
+ outputFormat = ReflectionUtils.newInstance(taskContext
+ .getOutputFormatClass(), conf);
+ committer = outputFormat.getOutputCommitter(taskContext);
+ } catch (Exception e) {
+ throw new YarnException(e);
+ }
+ } else {
+ committer = ReflectionUtils.newInstance(conf.getClass(
+ "mapred.output.committer.class", FileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class), conf);
+ }
+ LOG.info("OutputCommitter is " + committer.getClass().getName());
+ return committer;
+ }
+
+ protected boolean keepJobFiles(JobConf conf) {
+ return (conf.getKeepTaskFilesPattern() != null || conf
+ .getKeepFailedTaskFiles());
+ }
+
+ /**
+ * Create the default file System for this job.
+ * @param conf the conf object
+ * @return the default filesystem for this job
+ * @throws IOException
+ */
+ protected FileSystem getFileSystem(Configuration conf) throws IOException {
+ return FileSystem.get(conf);
+ }
+
+ /**
+ * clean up staging directories for the job.
+ * @throws IOException
+ */
+ public void cleanupStagingDir() throws IOException {
+ /* make sure we clean the staging files */
+ String jobTempDir = null;
+ FileSystem fs = getFileSystem(getConfig());
+ try {
+ if (!keepJobFiles(new JobConf(getConfig()))) {
+ jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
+ if (jobTempDir == null) {
+ LOG.warn("Job Staging directory is null");
+ return;
+ }
+ Path jobTempDirPath = new Path(jobTempDir);
+ LOG.info("Deleting staging directory " + FileSystem.getDefaultUri(getConfig()) +
+ " " + jobTempDir);
+ fs.delete(jobTempDirPath, true);
+ }
+ } catch(IOException io) {
+ LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
+ }
+ }
+
+ /**
+ * Exit call. Just in a function call to enable testing.
+ */
+ protected void sysexit() {
+ System.exit(0);
+ }
+
+ private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+ @Override
+ public void handle(JobFinishEvent event) {
+ // job has finished
+ // this is the only job, so shut down the Appmaster
+ // note in a workflow scenario, this may lead to creation of a new
+ // job (FIXME?)
+
+ // TODO:currently just wait for some time so clients can know the
+ // final states. Will be removed once RM come on.
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ try {
+ // Stop all services
+ // This will also send the final report to the ResourceManager
+ LOG.info("Calling stop for all the services");
+ stop();
+
+ // Send job-end notification
+ try {
+ LOG.info("Job end notification started for jobID : "
+ + job.getReport().getJobId());
+ JobEndNotifier notifier = new JobEndNotifier();
+ notifier.setConf(getConfig());
+ notifier.notify(job.getReport());
+ } catch (InterruptedException ie) {
+ LOG.warn("Job end notification interrupted for jobID : "
+ + job.getReport().getJobId(), ie );
+ }
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
+
+ // Cleanup staging directory
+ try {
+ cleanupStagingDir();
+ } catch(IOException io) {
+ LOG.warn("Failed to delete staging dir");
+ }
+
+ //Bring the process down by force.
+ //Not needed after HADOOP-7140
+ LOG.info("Exiting MR AppMaster..GoodBye!");
+ sysexit();
+ }
+ }
+
+ /**
+ * create an event handler that handles the job finish event.
+ * @return the job finish event handler.
+ */
+ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
+ return new JobFinishEventHandler();
+ }
+
/** Create and initialize (but don't start) a single job. */
protected Job createJob(Configuration conf) {
- // ////////// Obtain the tokens needed by the job. //////////
- Credentials fsTokens = new Credentials();
- UserGroupInformation currentUser = null;
+ // create single job
+ Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher
+ .getEventHandler(), taskAttemptListener, jobTokenSecretManager,
+ fsTokens, clock, completedTasksFromPreviousRun, metrics, committer,
+ newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos);
+ ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
+
+ dispatcher.register(JobFinishEvent.Type.class,
+ createJobFinishEventHandler());
+ return newJob;
+ } // end createJob()
+
+
+ /**
+ * Obtain the tokens needed by the job and put them in the UGI
+ * @param conf
+ */
+ protected void downloadTokensAndSetupUGI(Configuration conf) {
try {
- currentUser = UserGroupInformation.getCurrentUser();
+ this.currentUser = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
// Read the file-system tokens from the localized tokens-file.
@@ -246,56 +463,18 @@ public class MRAppMaster extends Composi
+ jobTokenFile);
for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
- LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
- + "in current ugi in the AppMaster for service "
- + tk.getService());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Token of kind " + tk.getKind()
+ + "in current ugi in the AppMaster for service "
+ + tk.getService());
+ }
currentUser.addToken(tk); // For use by AppMaster itself.
}
}
} catch (IOException e) {
throw new YarnException(e);
}
- // ////////// End of obtaining the tokens needed by the job. //////////
-
- // create single job
- Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
- taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
- completedTasksFromPreviousRun, metrics, currentUser.getUserName());
- ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
-
- dispatcher.register(JobFinishEvent.Type.class,
- new EventHandler<JobFinishEvent>() {
- @Override
- public void handle(JobFinishEvent event) {
- // job has finished
- // this is the only job, so shut down the Appmaster
- // note in a workflow scenario, this may lead to creation of a new
- // job (FIXME?)
-
- // TODO:currently just wait for some time so clients can know the
- // final states. Will be removed once RM come on.
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- LOG.info("Calling stop for all the services");
- try {
- stop();
- } catch (Throwable t) {
- LOG.warn("Graceful stop failed ", t);
- }
- //TODO: this is required because rpc server does not shut down
- // in spite of calling server.stop().
- //Bring the process down by force.
- //Not needed after HADOOP-7140
- LOG.info("Exiting MR AppMaster..GoodBye!");
- System.exit(0);
- }
- });
-
- return newJob;
- } // end createJob()
+ }
protected void addIfService(Object object) {
if (object instanceof Service) {
@@ -373,6 +552,22 @@ public class MRAppMaster extends Composi
return appAttemptID.getApplicationId();
}
+ public ApplicationAttemptId getAttemptID() {
+ return appAttemptID;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public OutputCommitter getCommitter() {
+ return committer;
+ }
+
+ public boolean isNewApiCommitter() {
+ return newApiCommitter;
+ }
+
public int getStartCount() {
return appAttemptID.getAttemptId();
}
@@ -389,6 +584,10 @@ public class MRAppMaster extends Composi
return completedTasksFromPreviousRun;
}
+ public List<AMInfo> getAllAMInfos() {
+ return amInfos;
+ }
+
public ContainerAllocator getContainerAllocator() {
return containerAllocator;
}
@@ -522,6 +721,7 @@ public class MRAppMaster extends Composi
return jobs;
}
+ @SuppressWarnings("rawtypes")
@Override
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
@@ -538,13 +738,39 @@ public class MRAppMaster extends Composi
}
}
+ @SuppressWarnings("unchecked")
@Override
public void start() {
- ///////////////////// Create the job itself.
+ // Pull completedTasks etc from recovery
+ if (inRecovery) {
+ completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
+ amInfos = recoveryServ.getAMInfos();
+ }
+
+ // / Create the AMInfo for the current AppMaster
+ if (amInfos == null) {
+ amInfos = new LinkedList<AMInfo>();
+ }
+ AMInfo amInfo =
+ MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
+ nmPort, nmHttpPort);
+ amInfos.add(amInfo);
+
+ // /////////////////// Create the job itself.
job = createJob(getConfig());
+
// End of creating the job.
+ // Send out an MR AM inited event for this AM and all previous AMs.
+ for (AMInfo info : amInfos) {
+ dispatcher.getEventHandler().handle(
+ new JobHistoryEvent(job.getID(), new AMStartedEvent(info
+ .getAppAttemptId(), info.getStartTime(), info.getContainerId(),
+ info.getNodeManagerHost(), info.getNodeManagerPort(), info
+ .getNodeManagerHttpPort())));
+ }
+
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
@@ -590,6 +816,7 @@ public class MRAppMaster extends Composi
* In a typical workflow, one presumably would want to uberize only a subset
* of the jobs (the "small" ones), which is awkward with the current design.
*/
+ @SuppressWarnings("unchecked")
protected void startJobs() {
/** create a job-start event to get this ball rolling */
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
@@ -598,6 +825,7 @@ public class MRAppMaster extends Composi
}
private class JobEventDispatcher implements EventHandler<JobEvent> {
+ @SuppressWarnings("unchecked")
@Override
public void handle(JobEvent event) {
((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
@@ -605,6 +833,7 @@ public class MRAppMaster extends Composi
}
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskEvent event) {
Task task = context.getJob(event.getTaskID().getJobId()).getTask(
@@ -615,6 +844,7 @@ public class MRAppMaster extends Composi
private class TaskAttemptEventDispatcher
implements EventHandler<TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId());
@@ -640,19 +870,44 @@ public class MRAppMaster extends Composi
}
}
+ private static void validateInputParam(String value, String param)
+ throws IOException {
+ if (value == null) {
+ String msg = param + " is null";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
public static void main(String[] args) {
try {
- String applicationAttemptIdStr = System
- .getenv(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
- if (applicationAttemptIdStr == null) {
- String msg = ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV
- + " is null";
- LOG.error(msg);
- throw new IOException(msg);
- }
- ApplicationAttemptId applicationAttemptId = ConverterUtils
- .toApplicationAttemptId(applicationAttemptIdStr);
- MRAppMaster appMaster = new MRAppMaster(applicationAttemptId);
+ String containerIdStr =
+ System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+ String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+ String nodeHttpPortString =
+ System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
+ String appSubmitTimeStr =
+ System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+ validateInputParam(containerIdStr,
+ ApplicationConstants.AM_CONTAINER_ID_ENV);
+ validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+ validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+ validateInputParam(nodeHttpPortString,
+ ApplicationConstants.NM_HTTP_PORT_ENV);
+ validateInputParam(appSubmitTimeStr,
+ ApplicationConstants.APP_SUBMIT_TIME_ENV);
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId applicationAttemptId =
+ containerId.getApplicationAttemptId();
+ long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+
+ MRAppMaster appMaster =
+ new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
+ Integer.parseInt(nodePortString),
+ Integer.parseInt(nodeHttpPortString), appSubmitTime);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java Wed Nov 2 05:34:31 2011
@@ -27,7 +27,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.yarn.proto.MRClientProtocol;
-import org.apache.hadoop.yarn.security.ApplicationTokenSelector;
+import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
public class MRClientSecurityInfo extends SecurityInfo {
@@ -51,7 +51,7 @@ public class MRClientSecurityInfo extend
@Override
public Class<? extends TokenSelector<? extends TokenIdentifier>>
value() {
- return ApplicationTokenSelector.class;
+ return ClientTokenSelector.class;
}
};
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Wed Nov 2 05:34:31 2011
@@ -24,12 +24,35 @@ import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+/**
+ * This class listens for changes to the state of a Task.
+ */
public interface TaskAttemptListener {
InetSocketAddress getAddress();
- void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+ /**
+ * register a JVM with the listener. This should be called as soon as a
+ * JVM ID is assigned to a task attempt, before it has been launched.
+ * @param jvmID The ID of the JVM .
+ */
+ void registerPendingTask(WrappedJvmID jvmID);
+
+ /**
+ * Register the task and task attempt with the JVM. This should be called
+ * when the JVM has been launched.
+ * @param attemptID the id of the attempt for this JVM.
+ * @param task the task itself for this JVM.
+ * @param jvmID the id of the JVM handling the task.
+ */
+ void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+ /**
+ * Unregister the JVM and the attempt associated with it. This should be
+ * called when the attempt/JVM has finished executing and is being cleaned up.
+ * @param attemptID the ID of the attempt.
+ * @param jvmID the ID of the JVM for that attempt.
+ */
void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java Wed Nov 2 05:34:31 2011
@@ -71,6 +71,7 @@ public class TaskHeartbeatHandler extend
@Override
public void start() {
lostTaskCheckerThread = new Thread(new PingChecker());
+ lostTaskCheckerThread.setName("TaskHeartbeatHandler PingChecker");
lostTaskCheckerThread.start();
super.start();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Wed Nov 2 05:34:31 2011
@@ -18,20 +18,18 @@
package org.apache.hadoop.mapreduce.v2.app.client;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.security.AccessControlException;
import java.util.Arrays;
import java.util.Collection;
-import org.apache.avro.ipc.Server;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
@@ -72,21 +70,20 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@@ -131,8 +128,8 @@ public class MRClientService extends Abs
System
.getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
byte[] bytes = Base64.decodeBase64(secretKeyStr);
- ApplicationTokenIdentifier identifier =
- new ApplicationTokenIdentifier(this.appContext.getApplicationID());
+ ClientTokenIdentifier identifier = new ClientTokenIdentifier(
+ this.appContext.getApplicationID());
secretManager.setMasterKey(identifier, bytes);
}
server =
@@ -140,6 +137,14 @@ public class MRClientService extends Abs
conf, secretManager,
conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT));
+
+ // Enable service authorization?
+ if (conf.getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ false)) {
+ refreshServiceAcls(conf, new MRAMPolicyProvider());
+ }
+
server.start();
this.bindAddress =
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
@@ -154,8 +159,13 @@ public class MRClientService extends Abs
super.start();
}
+ void refreshServiceAcls(Configuration configuration,
+ PolicyProvider policyProvider) {
+ this.server.refreshServiceAcl(configuration, policyProvider);
+ }
+
public void stop() {
- server.close();
+ server.stop();
if (webApp != null) {
webApp.stop();
}
@@ -183,13 +193,6 @@ public class MRClientService extends Abs
if (job == null) {
throw RPCUtil.getRemoteException("Unknown job " + jobID);
}
- //TODO fix job acls.
- //JobACL operation = JobACL.VIEW_JOB;
- //if (modifyAccess) {
- // operation = JobACL.MODIFY_JOB;
- //}
- //TO disable check access ofr now.
- //checkAccess(job, operation);
return job;
}
@@ -213,24 +216,6 @@ public class MRClientService extends Abs
return attempt;
}
- private void checkAccess(Job job, JobACL jobOperation)
- throws YarnRemoteException {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
- }
- UserGroupInformation callerUGI;
- try {
- callerUGI = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
- }
- if(!job.checkAccess(callerUGI, jobOperation)) {
- throw RPCUtil.getRemoteException(new AccessControlException("User "
- + callerUGI.getShortUserName() + " cannot perform operation "
- + jobOperation.name() + " on " + job.getID()));
- }
- }
-
@Override
public GetCountersResponse getCounters(GetCountersRequest request)
throws YarnRemoteException {
@@ -291,6 +276,7 @@ public class MRClientService extends Abs
return response;
}
+ @SuppressWarnings("unchecked")
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
@@ -307,6 +293,7 @@ public class MRClientService extends Abs
return response;
}
+ @SuppressWarnings("unchecked")
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws YarnRemoteException {
@@ -321,6 +308,7 @@ public class MRClientService extends Abs
return response;
}
+ @SuppressWarnings("unchecked")
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws YarnRemoteException {
@@ -350,6 +338,7 @@ public class MRClientService extends Abs
return response;
}
+ @SuppressWarnings("unchecked")
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws YarnRemoteException {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Wed Nov 2 05:34:31 2011
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@@ -68,5 +69,10 @@ public interface Job {
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+ /**
+ * @return information for MR AppMasters (previously failed and current)
+ */
+ List<AMInfo> getAMInfos();
+
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Wed Nov 2 05:34:31 2011
@@ -58,6 +58,11 @@ public interface TaskAttempt {
* @return node's http address if a container is assigned, otherwise null.
*/
String getNodeHttpAddress();
+
+ /**
+ * @return node's rack name if a container is assigned, otherwise null.
+ */
+ String getNodeRackName();
/**
* @return time at which container is launched. If container is not launched
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java Wed Nov 2 05:34:31 2011
@@ -18,22 +18,29 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
+import java.util.Map;
+
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
-
-
public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
private final Container container;
+ private final Map<ApplicationAccessType, String> applicationACLs;
public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
- Container container) {
+ Container container, Map<ApplicationAccessType, String> applicationACLs) {
super(id, TaskAttemptEventType.TA_ASSIGNED);
this.container = container;
+ this.applicationACLs = applicationACLs;
}
public Container getContainer() {
return this.container;
}
+
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return this.applicationACLs;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java Wed Nov 2 05:34:31 2011
@@ -48,7 +48,6 @@ public class TaskAttemptStatusUpdateEven
public TaskAttemptId id;
public float progress;
public Counters counters;
- public String diagnosticInfo;
public String stateString;
public Phase phase;
public long outputSize;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Wed Nov 2 05:34:31 2011
@@ -39,15 +39,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -63,7 +60,7 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@@ -97,14 +94,11 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -125,21 +119,21 @@ public class JobImpl implements org.apac
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
-
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
//final fields
private final ApplicationAttemptId applicationAttemptId;
private final Clock clock;
private final JobACLsManager aclsManager;
private final String username;
+ private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs;
private final Set<TaskId> completedTasksFromPreviousRun;
+ private final List<AMInfo> amInfos;
private final Lock readLock;
private final Lock writeLock;
private final JobId jobId;
private final String jobName;
+ private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final Object tasksSyncHandle = new Object();
@@ -148,6 +142,7 @@ public class JobImpl implements org.apac
private final EventHandler eventHandler;
private final MRAppMetrics metrics;
private final String userName;
+ private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@@ -164,7 +159,6 @@ public class JobImpl implements org.apac
private Path remoteJobSubmitDir;
public Path remoteJobConfFile;
private JobContext jobContext;
- private OutputCommitter committer;
private int allowedMapFailuresPercent = 0;
private int allowedReduceFailuresPercent = 0;
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
@@ -339,7 +333,6 @@ public class JobImpl implements org.apac
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.INTERNAL_ERROR))
-
// create the topology tables
.installTopology();
@@ -355,7 +348,6 @@ public class JobImpl implements org.apac
private int failedReduceTaskCount = 0;
private int killedMapTaskCount = 0;
private int killedReduceTaskCount = 0;
- private long submitTime;
private long startTime;
private long finishTime;
private float setupProgress;
@@ -366,29 +358,27 @@ public class JobImpl implements org.apac
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
- public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
- EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
+ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
+ Configuration conf, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
- Credentials fsTokenCredentials, Clock clock,
+ Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
- String userName) {
+ OutputCommitter committer, boolean newApiCommitter, String userName,
+ long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId;
- this.jobId = recordFactory.newRecordInstance(JobId.class);
+ this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf;
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
+ this.amInfos = amInfos;
this.userName = userName;
- ApplicationId applicationId = applicationAttemptId.getApplicationId();
- jobId.setAppId(applicationId);
- jobId.setId(applicationId.getId());
- oldJobId = TypeConverter.fromYarn(jobId);
- LOG.info("Job created" +
- " appId=" + applicationId +
- " jobId=" + jobId +
- " oldJobId=" + oldJobId);
-
+ this.appSubmitTime = appSubmitTime;
+ this.oldJobId = TypeConverter.fromYarn(jobId);
+ this.newApiCommitter = newApiCommitter;
+
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -397,6 +387,7 @@ public class JobImpl implements org.apac
this.fsTokens = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
+ this.committer = committer;
this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name");
@@ -589,13 +580,14 @@ public class JobImpl implements org.apac
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
- startTime, finishTime, setupProgress, 0.0f,
- 0.0f, cleanupProgress, remoteJobConfFile.toString());
+ appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
+ cleanupProgress, remoteJobConfFile.toString(), amInfos);
}
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
- startTime, finishTime, setupProgress, computeProgress(mapTasks),
- computeProgress(reduceTasks), cleanupProgress, remoteJobConfFile.toString());
+ appSubmitTime, startTime, finishTime, setupProgress,
+ computeProgress(mapTasks), computeProgress(reduceTasks),
+ cleanupProgress, remoteJobConfFile.toString(), amInfos);
} finally {
readLock.unlock();
}
@@ -724,6 +716,16 @@ public class JobImpl implements org.apac
this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));
}
+ /**
+ * Create the default file System for this job.
+ * @param conf the conf object
+ * @return the default filesystem for this job
+ * @throws IOException
+ */
+ protected FileSystem getFileSystem(Configuration conf) throws IOException {
+ return FileSystem.get(conf);
+ }
+
static JobState checkJobCompleteSuccess(JobImpl job) {
// check for Job success
if (job.completedTaskCount == job.getTasks().size()) {
@@ -733,7 +735,6 @@ public class JobImpl implements org.apac
} catch (IOException e) {
LOG.warn("Could not do commit for Job", e);
}
-
job.logJobHistoryFinishedEvent();
return job.finished(JobState.SUCCEEDED);
}
@@ -798,6 +799,11 @@ public class JobImpl implements org.apac
public Map<JobACL, AccessControlList> getJobACLs() {
return Collections.unmodifiableMap(jobACLs);
}
+
+ @Override
+ public List<AMInfo> getAMInfos() {
+ return amInfos;
+ }
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
@@ -811,18 +817,17 @@ public class JobImpl implements org.apac
*/
@Override
public JobState transition(JobImpl job, JobEvent event) {
- job.submitTime = job.clock.getTime();
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
try {
setup(job);
- job.fs = FileSystem.get(job.conf);
+ job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
- job.submitTime,
+ job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "default"));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
@@ -838,60 +843,30 @@ public class JobImpl implements org.apac
checkTaskLimits();
-
- boolean newApiCommitter = false;
- if ((job.numReduceTasks > 0 &&
- job.conf.getBoolean("mapred.reducer.new-api", false)) ||
- (job.numReduceTasks == 0 &&
- job.conf.getBoolean("mapred.mapper.new-api", false))) {
- newApiCommitter = true;
- LOG.info("Using mapred newApiCommitter.");
- }
-
- LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class"));
-
- if (newApiCommitter) {
+ if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider
- .getRecordFactory(null)
- .newRecordInstance(
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
- attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
- .newRecordInstance(TaskId.class));
- attemptID.getTaskId().setJobId(job.jobId);
- attemptID.getTaskId().setTaskType(TaskType.MAP);
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
- TypeConverter.fromYarn(attemptID));
- try {
- OutputFormat outputFormat = ReflectionUtils.newInstance(
- taskContext.getOutputFormatClass(), job.conf);
- job.committer = outputFormat.getOutputCommitter(taskContext);
- } catch(Exception e) {
- throw new IOException("Failed to assign outputcommitter", e);
- }
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(job.conf), job.oldJobId);
- job.committer = ReflectionUtils.newInstance(
- job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
- org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
}
- LOG.info("OutputCommitter is " + job.committer.getClass().getName());
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
-//FIXME: need new memory criterion for uber-decision (oops, too late here; until AM-resizing supported, must depend on job client to pass fat-slot needs)
+ //FIXME: need new memory criterion for uber-decision (oops, too late here;
+ // until AM-resizing supported, must depend on job client to pass fat-slot needs)
// these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
int sysMaxReduces =
job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
- job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is wrong; get FS from [File?]InputFormat and default block size from that
- //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could use default AM-container memory size...]
+ job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
+ // wrong; get FS from [File?]InputFormat and default block size from that
+ //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot();
+ // FIXME [could use default AM-container memory size...]
boolean uberEnabled =
job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@@ -900,8 +875,8 @@ public class JobImpl implements org.apac
boolean smallInput = (inputLength <= sysMaxBytes);
boolean smallMemory = true; //FIXME (see above)
// ignoring overhead due to UberTask and statics as negligible here:
-// FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
-// || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
+ // FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
+ // || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
boolean notChainJob = !isChainJob(job.conf);
// User has overall veto power over uberization, or user can modify
@@ -935,7 +910,9 @@ public class JobImpl implements org.apac
job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation: makes no sense to speculate an entire job
-// canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()]
+ //canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old
+ //version, ultimately was from conf.getMapSpeculativeExecution(),
+ //conf.getReduceSpeculativeExecution()]
} else {
StringBuilder msg = new StringBuilder();
msg.append("Not uberizing ").append(job.jobId).append(" because:");
@@ -1022,13 +999,6 @@ public class JobImpl implements org.apac
if (UserGroupInformation.isSecurityEnabled()) {
tokenStorage.addAll(job.fsTokens);
}
-
- Path remoteJobTokenFile =
- new Path(job.remoteJobSubmitDir,
- MRJobConfig.APPLICATION_TOKENS_FILE);
- tokenStorage.writeTokenStorageFile(remoteJobTokenFile, job.conf);
- LOG.info("Writing back the job-token file on the remote file system:"
- + remoteJobTokenFile.toString());
}
/**
@@ -1138,7 +1108,7 @@ public class JobImpl implements org.apac
job.isUber()); //Will transition to state running. Currently in INITED
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
- job.submitTime, job.startTime);
+ job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed Nov 2 05:34:31 2011
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
-import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -53,7 +52,6 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
@@ -98,16 +96,18 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -116,9 +116,12 @@ import org.apache.hadoop.yarn.state.Inva
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
+
/**
* Implementation of TaskAttempt interface.
*/
@@ -156,6 +159,8 @@ public abstract class TaskAttemptImpl im
private long finishTime;
private WrappedProgressSplitsBlock progressSplitBlock;
private int shufflePort = -1;
+ private String trackerName;
+ private int httpPort;
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
new CleanupContainerTransition();
@@ -421,9 +426,10 @@ public abstract class TaskAttemptImpl im
stateMachine;
private ContainerId containerID;
- private String nodeHostName;
+ private NodeId containerNodeId;
private String containerMgrAddress;
private String nodeHttpAddress;
+ private String nodeRackName;
private WrappedJvmID jvmID;
private ContainerToken containerToken;
private Resource assignedCapability;
@@ -434,6 +440,9 @@ public abstract class TaskAttemptImpl im
//this is the last status reported by the REMOTE running attempt
private TaskAttemptStatus reportedStatus;
+
+ private static final String LINE_SEPARATOR = System
+ .getProperty("line.separator");
public TaskAttemptImpl(TaskId taskId, int i,
@SuppressWarnings("rawtypes") EventHandler eventHandler,
@@ -526,8 +535,10 @@ public abstract class TaskAttemptImpl im
/**
* Create the {@link ContainerLaunchContext} for this attempt.
+ * @param applicationACLs
*/
- private ContainerLaunchContext createContainerLaunchContext() {
+ private ContainerLaunchContext createContainerLaunchContext(
+ Map<ApplicationAccessType, String> applicationACLs) {
// Application resources
Map<String, LocalResource> localResources =
@@ -611,7 +622,7 @@ public abstract class TaskAttemptImpl im
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeServiceData(jobToken));
- MRApps.addToEnvironment(
+ Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
getInitialClasspath());
@@ -628,17 +639,11 @@ public abstract class TaskAttemptImpl im
jvmID);
// Construct the actual Container
- ContainerLaunchContext container =
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
- container.setContainerId(containerID);
- container.setUser(conf.get(MRJobConfig.USER_NAME));
- container.setResource(assignedCapability);
- container.setLocalResources(localResources);
- container.setEnvironment(environment);
- container.setCommands(commands);
- container.setServiceData(serviceData);
- container.setContainerTokens(tokens);
-
+ ContainerLaunchContext container = BuilderUtils
+ .newContainerLaunchContext(containerID, conf
+ .get(MRJobConfig.USER_NAME), assignedCapability, localResources,
+ environment, commands, serviceData, tokens, applicationACLs);
+
return container;
}
@@ -723,6 +728,19 @@ public abstract class TaskAttemptImpl im
readLock.unlock();
}
}
+
+ /**
+ * If container Assigned then return the node's rackname, otherwise null.
+ */
+ @Override
+ public String getNodeRackName() {
+ this.readLock.lock();
+ try {
+ return this.nodeRackName;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
protected abstract org.apache.hadoop.mapred.Task createRemoteTask();
@@ -758,10 +776,16 @@ public abstract class TaskAttemptImpl im
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
- result.setDiagnosticInfo(reportedStatus.diagnosticInfo);
+ result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(getCounters());
+ result.setContainerId(this.getAssignedContainerID());
+ result.setNodeManagerHost(trackerName);
+ result.setNodeManagerHttpPort(httpPort);
+ if (this.containerNodeId != null) {
+ result.setNodeManagerPort(this.containerNodeId.getPort());
+ }
return result;
} finally {
readLock.unlock();
@@ -855,7 +879,7 @@ public abstract class TaskAttemptImpl im
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq =
- taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+ taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
int simSlotsRequired =
slotMemoryReq
/ (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT
@@ -888,15 +912,20 @@ public abstract class TaskAttemptImpl im
return jce;
}
- private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
- TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
- TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent(
- TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
- attemptState.toString(), taskAttempt.finishTime,
- taskAttempt.nodeHostName == null ? "UNKNOWN" : taskAttempt.nodeHostName,
- taskAttempt.reportedStatus.diagnosticInfo.toString(),
- taskAttempt.getProgressSplitBlock().burst());
+ private static
+ TaskAttemptUnsuccessfulCompletionEvent
+ createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
+ TaskAttemptState attemptState) {
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ new TaskAttemptUnsuccessfulCompletionEvent(
+ TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
+ .getTaskType()), attemptState.toString(),
+ taskAttempt.finishTime,
+ taskAttempt.containerMgrAddress == null ? "UNKNOWN"
+ : taskAttempt.containerMgrAddress, StringUtils.join(
+ LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
+ .getProgressSplitBlock().burst());
return tauce;
}
@@ -988,17 +1017,19 @@ public abstract class TaskAttemptImpl im
private static class ContainerAssignedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "deprecation" })
@Override
public void transition(final TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
- TaskAttemptContainerAssignedEvent cEvent =
+ final TaskAttemptContainerAssignedEvent cEvent =
(TaskAttemptContainerAssignedEvent) event;
taskAttempt.containerID = cEvent.getContainer().getId();
- taskAttempt.nodeHostName = cEvent.getContainer().getNodeId().getHost();
- taskAttempt.containerMgrAddress = cEvent.getContainer().getNodeId()
+ taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
+ taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
.toString();
taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
+ taskAttempt.nodeRackName = RackResolver.resolve(
+ taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
// this is a _real_ Task (classic Hadoop mapred flavor):
@@ -1006,6 +1037,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.jvmID = new WrappedJvmID(
taskAttempt.remoteTask.getTaskID().getJobID(),
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
+ taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
//launch the container
//create the container object to be launched for a given Task attempt
@@ -1015,7 +1047,8 @@ public abstract class TaskAttemptImpl im
taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
@Override
public ContainerLaunchContext getContainer() {
- return taskAttempt.createContainerLaunchContext();
+ return taskAttempt.createContainerLaunchContext(cEvent
+ .getApplicationACLs());
}
@Override
public Task getRemoteTask() { // classic mapred Task, not YARN version
@@ -1095,14 +1128,18 @@ public abstract class TaskAttemptImpl im
//set the launch time
taskAttempt.launchTime = taskAttempt.clock.getTime();
+ taskAttempt.shufflePort = event.getShufflePort();
+
// register it to TaskAttemptListener so that it start listening
// for it
- taskAttempt.taskAttemptListener.register(
+ taskAttempt.taskAttemptListener.registerLaunchedTask(
taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
// Costly?
+ taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
+ taskAttempt.httpPort = nodeHttpInetAddr.getPort();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
.getJobId());
@@ -1112,11 +1149,15 @@ public abstract class TaskAttemptImpl im
, 1);
taskAttempt.eventHandler.handle(jce);
+ LOG.info("TaskAttempt: [" + taskAttempt.attemptId
+ + "] using containerId: [" + taskAttempt.containerID + " on NM: ["
+ + taskAttempt.containerMgrAddress + "]");
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
taskAttempt.launchTime,
- nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort());
+ nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
+ taskAttempt.shufflePort, taskAttempt.containerID);
taskAttempt.eventHandler.handle
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
taskAttempt.eventHandler.handle
@@ -1125,7 +1166,6 @@ public abstract class TaskAttemptImpl im
//make remoteTask reference as null as it is no more needed
//and free up the memory
taskAttempt.remoteTask = null;
- taskAttempt.shufflePort = event.getShufflePort();
//tell the Task that attempt has started
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
@@ -1152,6 +1192,7 @@ public abstract class TaskAttemptImpl im
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
+ @SuppressWarnings("deprecation")
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(new JobConf(taskAttempt.conf),
TypeConverter.fromYarn(taskAttempt.attemptId));
@@ -1229,7 +1270,10 @@ public abstract class TaskAttemptImpl im
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
- finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName,
+ finishTime,
+ this.containerNodeId == null ? "UNKNOWN"
+ : this.containerNodeId.getHost(),
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());
@@ -1242,7 +1286,10 @@ public abstract class TaskAttemptImpl im
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
- finishTime, this.containerMgrAddress == null ? "UNKNOWN" : this.containerMgrAddress,
+ finishTime,
+ this.containerNodeId == null ? "UNKNOWN"
+ : this.containerNodeId.getHost(),
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());
@@ -1352,8 +1399,6 @@ public abstract class TaskAttemptImpl im
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
- //add to diagnostic
- taskAttempt.addDiagnosticInfo(newReportedStatus.diagnosticInfo);
taskAttempt.updateProgressSplits();
//if fetch failures are present, send the fetch failure event to job
@@ -1381,7 +1426,6 @@ public abstract class TaskAttemptImpl im
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
- result.diagnosticInfo = "";
result.phase = Phase.STARTING;
result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Wed Nov 2 05:34:31 2011
@@ -441,10 +441,20 @@ public abstract class TaskImpl implement
float progress = 0f;
TaskAttempt result = null;
for (TaskAttempt at : attempts.values()) {
+ switch (at.getState()) {
+
+ // ignore all failed task attempts
+ case FAIL_CONTAINER_CLEANUP:
+ case FAIL_TASK_CLEANUP:
+ case FAILED:
+ case KILL_CONTAINER_CLEANUP:
+ case KILL_TASK_CLEANUP:
+ case KILLED:
+ continue;
+ }
if (result == null) {
result = at; //The first time around
}
- //TODO: consider the nextAttemptNumber only if it is not failed/killed ?
// calculate the best progress
if (at.getProgress() > progress) {
result = at;
@@ -496,7 +506,7 @@ public abstract class TaskImpl implement
break;
case 1:
- Map newAttempts
+ Map<TaskAttemptId, TaskAttempt> newAttempts
= new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
newAttempts.putAll(attempts);
attempts = newAttempts;
@@ -558,7 +568,8 @@ public abstract class TaskImpl implement
//raise the completion event only if the container is assigned
// to nextAttemptNumber
if (attempt.getNodeHttpAddress() != null) {
- TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
+ TaskAttemptCompletionEvent tce = recordFactory
+ .newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
tce.setMapOutputServerAddress("http://"
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java Wed Nov 2 05:34:31 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.launcher;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.event.EventHandler;
public interface ContainerLauncher
@@ -28,4 +29,12 @@ public interface ContainerLauncher
CONTAINER_REMOTE_LAUNCH,
CONTAINER_REMOTE_CLEANUP
}
+
+ // Not a documented config. Only used for tests
+ static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX
+ + "nm-command-timeout";
+ /**
+ * Maximum of 1 minute timeout for a Node to react to the command
+ */
+ static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000;
}