You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/09/26 10:44:41 UTC
svn commit: r1175718 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac...
Author: acmurthy
Date: Mon Sep 26 08:44:41 2011
New Revision: 1175718
URL: http://svn.apache.org/viewvc?rev=1175718&view=rev
Log:
MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than (ApplicationId, startCount) consistently.
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Sep 26 08:44:41 2011
@@ -1423,6 +1423,9 @@ Release 0.23.0 - Unreleased
"mapreduce.jobtracker.address" configuration value for
JobTracker: "local" (Venu Gopala Rao via mahadev)
+ MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than
+ (ApplicationId, startCount) consistently. (acmurthy)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Sep 26 08:44:41 2011
@@ -115,8 +115,6 @@ public class MRAppMaster extends Composi
private Clock clock;
private final long startTime = System.currentTimeMillis();
private String appName;
- private final int startCount;
- private final ApplicationId appID;
private final ApplicationAttemptId appAttemptID;
protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
@@ -134,21 +132,16 @@ public class MRAppMaster extends Composi
private Job job;
- public MRAppMaster(ApplicationId applicationId, int startCount) {
- this(applicationId, new SystemClock(), startCount);
+ public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
+ this(applicationAttemptId, new SystemClock());
}
- public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) {
+ public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) {
super(MRAppMaster.class.getName());
this.clock = clock;
- this.appID = applicationId;
- this.appAttemptID = RecordFactoryProvider.getRecordFactory(null)
- .newRecordInstance(ApplicationAttemptId.class);
- this.appAttemptID.setApplicationId(appID);
- this.appAttemptID.setAttemptId(startCount);
- this.startCount = startCount;
+ this.appAttemptID = applicationAttemptId;
this.metrics = MRAppMetrics.create();
- LOG.info("Created MRAppMaster for application " + applicationId);
+ LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@Override
@@ -160,9 +153,9 @@ public class MRAppMaster extends Composi
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
- && startCount > 1) {
+ && appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. Will try to recover from previous life.");
- Recovery recoveryServ = new RecoveryService(appID, clock, startCount);
+ Recovery recoveryServ = new RecoveryService(appAttemptID, clock);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
@@ -265,8 +258,8 @@ public class MRAppMaster extends Composi
// ////////// End of obtaining the tokens needed by the job. //////////
// create single job
- Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
- taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
+ Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
+ taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, currentUser.getUserName());
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@@ -377,11 +370,11 @@ public class MRAppMaster extends Composi
}
public ApplicationId getAppID() {
- return appID;
+ return appAttemptID.getApplicationId();
}
public int getStartCount() {
- return startCount;
+ return appAttemptID.getAttemptId();
}
public AppContext getContext() {
@@ -506,7 +499,7 @@ public class MRAppMaster extends Composi
@Override
public ApplicationId getApplicationID() {
- return appID;
+ return appAttemptID.getApplicationId();
}
@Override
@@ -659,8 +652,7 @@ public class MRAppMaster extends Composi
}
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
- MRAppMaster appMaster = new MRAppMaster(applicationAttemptId
- .getApplicationId(), applicationAttemptId.getAttemptId());
+ MRAppMaster appMaster = new MRAppMaster(applicationAttemptId);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Sep 26 08:44:41 2011
@@ -42,7 +42,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceChildJVM;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -101,6 +100,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -129,11 +129,11 @@ public class JobImpl implements org.apac
RecordFactoryProvider.getRecordFactory(null);
//final fields
+ private final ApplicationAttemptId applicationAttemptId;
private final Clock clock;
private final JobACLsManager aclsManager;
private final String username;
private final Map<JobACL, AccessControlList> jobACLs;
- private final int startCount;
private final Set<TaskId> completedTasksFromPreviousRun;
private final Lock readLock;
private final Lock writeLock;
@@ -365,26 +365,26 @@ public class JobImpl implements org.apac
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
- public JobImpl(ApplicationId appID, Configuration conf,
+ public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
- Credentials fsTokenCredentials, Clock clock, int startCount,
+ Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
String userName) {
-
+ this.applicationAttemptId = applicationAttemptId;
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf;
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
- this.startCount = startCount;
this.userName = userName;
- jobId.setAppId(appID);
- jobId.setId(appID.getId());
+ ApplicationId applicationId = applicationAttemptId.getApplicationId();
+ jobId.setAppId(applicationId);
+ jobId.setId(applicationId.getId());
oldJobId = TypeConverter.fromYarn(jobId);
LOG.info("Job created" +
- " appId=" + appID +
+ " appId=" + applicationId +
" jobId=" + jobId +
" oldJobId=" + oldJobId);
@@ -1078,7 +1078,8 @@ public class JobImpl implements org.apac
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens.getAllTokens(),
- job.clock, job.completedTasksFromPreviousRun, job.startCount,
+ job.clock, job.completedTasksFromPreviousRun,
+ job.applicationAttemptId.getAttemptId(),
job.metrics);
job.addTask(task);
}
@@ -1095,7 +1096,9 @@ public class JobImpl implements org.apac
job.conf, job.numMapTasks,
job.taskAttemptListener, job.committer, job.jobToken,
job.fsTokens.getAllTokens(), job.clock,
- job.completedTasksFromPreviousRun, job.startCount, job.metrics);
+ job.completedTasksFromPreviousRun,
+ job.applicationAttemptId.getAttemptId(),
+ job.metrics);
job.addTask(task);
}
LOG.info("Number of reduces for job " + job.jobId + " = "
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon Sep 26 08:44:41 2011
@@ -58,7 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -92,10 +92,9 @@ public class RecoveryService extends Com
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
- private final ApplicationId appID;
+ private final ApplicationAttemptId applicationAttemptId;
private final Dispatcher dispatcher;
private final ControlledClock clock;
- private final int startCount;
private JobInfo jobInfo = null;
private final Map<TaskId, TaskInfo> completedTasks =
@@ -106,10 +105,10 @@ public class RecoveryService extends Com
private volatile boolean recoveryMode = false;
- public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
+ public RecoveryService(ApplicationAttemptId applicationAttemptId,
+ Clock clock) {
super("RecoveringDispatcher");
- this.appID = appID;
- this.startCount = startCount;
+ this.applicationAttemptId = applicationAttemptId;
this.dispatcher = new RecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@@ -152,7 +151,8 @@ public class RecoveryService extends Com
private void parse() throws IOException {
// TODO: parse history file based on startCount
- String jobName = TypeConverter.fromYarn(appID).toString();
+ String jobName =
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
@@ -160,8 +160,9 @@ public class RecoveryService extends Com
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
+ //read the previous history file
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
- histDirPath, jobName, startCount - 1)); //read the previous history file
+ histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Mon Sep 26 08:44:41 2011
@@ -66,6 +66,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -91,7 +92,7 @@ public class MRApp extends MRAppMaster {
private File testWorkDir;
private Path testAbsPath;
- private final RecordFactory recordFactory =
+ private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//if true, tasks complete automatically as soon as they are launched
@@ -100,7 +101,7 @@ public class MRApp extends MRAppMaster {
static ApplicationId applicationId;
static {
- applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+ applicationId = recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
}
@@ -108,9 +109,19 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
+
+ private static ApplicationAttemptId getApplicationAttemptId(
+ ApplicationId applicationId, int startCount) {
+ ApplicationAttemptId applicationAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ applicationAttemptId.setApplicationId(applicationId);
+ applicationAttemptId.setAttemptId(startCount);
+ return applicationAttemptId;
+ }
- public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
- super(applicationId, startCount);
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+ boolean cleanOnStart, int startCount) {
+ super(getApplicationAttemptId(applicationId, startCount));
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
@@ -391,11 +402,12 @@ public class MRApp extends MRAppMaster {
return localStateMachine;
}
- public TestJob(Configuration conf, ApplicationId appID,
+ public TestJob(Configuration conf, ApplicationId applicationId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
Clock clock, String user) {
- super(appID, conf, eventHandler, taskAttemptListener,
- new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
+ super(getApplicationAttemptId(applicationId, getStartCount()),
+ conf, eventHandler, taskAttemptListener,
+ new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, user);
// This "this leak" is okay because the retained pointer is in an