You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/09/26 10:44:41 UTC

svn commit: r1175718 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac...

Author: acmurthy
Date: Mon Sep 26 08:44:41 2011
New Revision: 1175718

URL: http://svn.apache.org/viewvc?rev=1175718&view=rev
Log:
MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than (ApplicationId, startCount) consistently. 

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Sep 26 08:44:41 2011
@@ -1423,6 +1423,9 @@ Release 0.23.0 - Unreleased
     "mapreduce.jobtracker.address" configuration value for 
     JobTracker: "local" (Venu Gopala Rao via mahadev)
 
+    MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than
+    (ApplicationId, startCount) consistently. (acmurthy)  
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Sep 26 08:44:41 2011
@@ -115,8 +115,6 @@ public class MRAppMaster extends Composi
   private Clock clock;
   private final long startTime = System.currentTimeMillis();
   private String appName;
-  private final int startCount;
-  private final ApplicationId appID;
   private final ApplicationAttemptId appAttemptID;
   protected final MRAppMetrics metrics;
   private Set<TaskId> completedTasksFromPreviousRun;
@@ -134,21 +132,16 @@ public class MRAppMaster extends Composi
 
   private Job job;
   
-  public MRAppMaster(ApplicationId applicationId, int startCount) {
-    this(applicationId, new SystemClock(), startCount);
+  public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
+    this(applicationAttemptId, new SystemClock());
   }
 
-  public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) {
+  public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) {
     super(MRAppMaster.class.getName());
     this.clock = clock;
-    this.appID = applicationId;
-    this.appAttemptID = RecordFactoryProvider.getRecordFactory(null)
-        .newRecordInstance(ApplicationAttemptId.class);
-    this.appAttemptID.setApplicationId(appID);
-    this.appAttemptID.setAttemptId(startCount);
-    this.startCount = startCount;
+    this.appAttemptID = applicationAttemptId;
     this.metrics = MRAppMetrics.create();
-    LOG.info("Created MRAppMaster for application " + applicationId);
+    LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
 
   @Override
@@ -160,9 +153,9 @@ public class MRAppMaster extends Composi
     appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
 
     if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
-         && startCount > 1) {
+         && appAttemptID.getAttemptId() > 1) {
       LOG.info("Recovery is enabled. Will try to recover from previous life.");
-      Recovery recoveryServ = new RecoveryService(appID, clock, startCount);
+      Recovery recoveryServ = new RecoveryService(appAttemptID, clock);
       addIfService(recoveryServ);
       dispatcher = recoveryServ.getDispatcher();
       clock = recoveryServ.getClock();
@@ -265,8 +258,8 @@ public class MRAppMaster extends Composi
     // ////////// End of obtaining the tokens needed by the job. //////////
 
     // create single job
-    Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
-        taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
+    Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
+        taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
         completedTasksFromPreviousRun, metrics, currentUser.getUserName());
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
@@ -377,11 +370,11 @@ public class MRAppMaster extends Composi
   }
 
   public ApplicationId getAppID() {
-    return appID;
+    return appAttemptID.getApplicationId();
   }
 
   public int getStartCount() {
-    return startCount;
+    return appAttemptID.getAttemptId();
   }
 
   public AppContext getContext() {
@@ -506,7 +499,7 @@ public class MRAppMaster extends Composi
 
     @Override
     public ApplicationId getApplicationID() {
-      return appID;
+      return appAttemptID.getApplicationId();
     }
 
     @Override
@@ -659,8 +652,7 @@ public class MRAppMaster extends Composi
       }
       ApplicationAttemptId applicationAttemptId = ConverterUtils
           .toApplicationAttemptId(applicationAttemptIdStr);
-      MRAppMaster appMaster = new MRAppMaster(applicationAttemptId
-          .getApplicationId(), applicationAttemptId.getAttemptId());
+      MRAppMaster appMaster = new MRAppMaster(applicationAttemptId);
       Runtime.getRuntime().addShutdownHook(
           new CompositeServiceShutdownHook(appMaster));
       YarnConfiguration conf = new YarnConfiguration(new JobConf());

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Sep 26 08:44:41 2011
@@ -42,7 +42,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceChildJVM;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -101,6 +100,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -129,11 +129,11 @@ public class JobImpl implements org.apac
       RecordFactoryProvider.getRecordFactory(null);
   
   //final fields
+  private final ApplicationAttemptId applicationAttemptId;
   private final Clock clock;
   private final JobACLsManager aclsManager;
   private final String username;
   private final Map<JobACL, AccessControlList> jobACLs;
-  private final int startCount;
   private final Set<TaskId> completedTasksFromPreviousRun;
   private final Lock readLock;
   private final Lock writeLock;
@@ -365,26 +365,26 @@ public class JobImpl implements org.apac
   private Token<JobTokenIdentifier> jobToken;
   private JobTokenSecretManager jobTokenSecretManager;
 
-  public JobImpl(ApplicationId appID, Configuration conf,
+  public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
       EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
-      Credentials fsTokenCredentials, Clock clock, int startCount, 
+      Credentials fsTokenCredentials, Clock clock, 
       Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
       String userName) {
-
+    this.applicationAttemptId = applicationAttemptId;
     this.jobId = recordFactory.newRecordInstance(JobId.class);
     this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
     this.conf = conf;
     this.metrics = metrics;
     this.clock = clock;
     this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
-    this.startCount = startCount;
     this.userName = userName;
-    jobId.setAppId(appID);
-    jobId.setId(appID.getId());
+    ApplicationId applicationId = applicationAttemptId.getApplicationId();
+    jobId.setAppId(applicationId);
+    jobId.setId(applicationId.getId());
     oldJobId = TypeConverter.fromYarn(jobId);
     LOG.info("Job created" +
-    		" appId=" + appID + 
+    		" appId=" + applicationId + 
     		" jobId=" + jobId + 
     		" oldJobId=" + oldJobId);
     
@@ -1078,7 +1078,8 @@ public class JobImpl implements org.apac
                 job.conf, splits[i], 
                 job.taskAttemptListener, 
                 job.committer, job.jobToken, job.fsTokens.getAllTokens(), 
-                job.clock, job.completedTasksFromPreviousRun, job.startCount,
+                job.clock, job.completedTasksFromPreviousRun, 
+                job.applicationAttemptId.getAttemptId(),
                 job.metrics);
         job.addTask(task);
       }
@@ -1095,7 +1096,9 @@ public class JobImpl implements org.apac
                 job.conf, job.numMapTasks, 
                 job.taskAttemptListener, job.committer, job.jobToken,
                 job.fsTokens.getAllTokens(), job.clock, 
-                job.completedTasksFromPreviousRun, job.startCount, job.metrics);
+                job.completedTasksFromPreviousRun, 
+                job.applicationAttemptId.getAttemptId(),
+                job.metrics);
         job.addTask(task);
       }
       LOG.info("Number of reduces for job " + job.jobId + " = "

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon Sep 26 08:44:41 2011
@@ -58,7 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -92,10 +92,9 @@ public class RecoveryService extends Com
 
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
 
-  private final ApplicationId appID;
+  private final ApplicationAttemptId applicationAttemptId;
   private final Dispatcher dispatcher;
   private final ControlledClock clock;
-  private final int startCount;
 
   private JobInfo jobInfo = null;
   private final Map<TaskId, TaskInfo> completedTasks =
@@ -106,10 +105,10 @@ public class RecoveryService extends Com
 
   private volatile boolean recoveryMode = false;
 
-  public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
+  public RecoveryService(ApplicationAttemptId applicationAttemptId, 
+      Clock clock) {
     super("RecoveringDispatcher");
-    this.appID = appID;
-    this.startCount = startCount;
+    this.applicationAttemptId = applicationAttemptId;
     this.dispatcher = new RecoveryDispatcher();
     this.clock = new ControlledClock(clock);
       addService((Service) dispatcher);
@@ -152,7 +151,8 @@ public class RecoveryService extends Com
 
   private void parse() throws IOException {
     // TODO: parse history file based on startCount
-    String jobName = TypeConverter.fromYarn(appID).toString();
+    String jobName = 
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
     String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
     FSDataInputStream in = null;
     Path historyFile = null;
@@ -160,8 +160,9 @@ public class RecoveryService extends Com
         new Path(jobhistoryDir));
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
         getConfig());
+    //read the previous history file
     historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
-        histDirPath, jobName, startCount - 1));          //read the previous history file
+        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));          
     in = fc.open(historyFile);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1175718&r1=1175717&r2=1175718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Mon Sep 26 08:44:41 2011
@@ -66,6 +66,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -91,7 +92,7 @@ public class MRApp extends MRAppMaster {
   private File testWorkDir;
   private Path testAbsPath;
 
-  private final RecordFactory recordFactory =
+  private static final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
 
   //if true, tasks complete automatically as soon as they are launched
@@ -100,7 +101,7 @@ public class MRApp extends MRAppMaster {
   static ApplicationId applicationId;
 
   static {
-    applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+    applicationId = recordFactory.newRecordInstance(ApplicationId.class);
     applicationId.setClusterTimestamp(0);
     applicationId.setId(0);
   }
@@ -108,9 +109,19 @@ public class MRApp extends MRAppMaster {
   public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) {
     this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
   }
+  
+  private static ApplicationAttemptId getApplicationAttemptId(
+      ApplicationId applicationId, int startCount) {
+    ApplicationAttemptId applicationAttemptId =
+        recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    applicationAttemptId.setApplicationId(applicationId);
+    applicationAttemptId.setAttemptId(startCount);
+    return applicationAttemptId;
+  }
 
-  public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
-    super(applicationId, startCount);
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName, 
+      boolean cleanOnStart, int startCount) {
+    super(getApplicationAttemptId(applicationId, startCount));
     this.testWorkDir = new File("target", testName);
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     LOG.info("PathUsed: " + testAbsPath);
@@ -391,11 +402,12 @@ public class MRApp extends MRAppMaster {
       return localStateMachine;
     }
 
-    public TestJob(Configuration conf, ApplicationId appID,
+    public TestJob(Configuration conf, ApplicationId applicationId,
         EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
         Clock clock, String user) {
-      super(appID, conf, eventHandler, taskAttemptListener,
-          new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), 
+      super(getApplicationAttemptId(applicationId, getStartCount()), 
+          conf, eventHandler, taskAttemptListener,
+          new JobTokenSecretManager(), new Credentials(), clock, 
           getCompletedTaskFromPreviousRun(), metrics, user);
 
       // This "this leak" is okay because the retained pointer is in an