You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:42 UTC

svn commit: r1399950 [2/11] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduc...

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Fri Oct 19 02:25:55 2012
@@ -99,8 +99,8 @@ public class JobHistoryEventHandler exte
   protected static final Map<JobId, MetaInfo> fileMap =
     Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
 
-  // Has a signal (SIGTERM etc) been issued?
-  protected volatile boolean isSignalled = false;
+  // should job completion be force when the AM shuts down?
+  protected volatile boolean forceJobCompletion = false;
 
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
@@ -322,7 +322,7 @@ public class JobHistoryEventHandler exte
     // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
     // closed their event writers
     Iterator<JobId> jobIt = fileMap.keySet().iterator();
-    if(isSignalled) {
+    if(forceJobCompletion) {
       while (jobIt.hasNext()) {
         JobId toClose = jobIt.next();
         MetaInfo mi = fileMap.get(toClose);
@@ -661,6 +661,8 @@ public class JobHistoryEventHandler exte
       summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true);
       summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
       summaryFileOut.close();
+      doneDirFS.setPermission(qualifiedSummaryDoneFile, new FsPermission(
+          JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
     } catch (IOException e) {
       LOG.info("Unable to write out JobSummaryInfo to ["
           + qualifiedSummaryDoneFile + "]", e);
@@ -894,7 +896,7 @@ public class JobHistoryEventHandler exte
       
       stagingDirFS.delete(fromPath, false);
     }
-    }
+  }
 
   boolean pathExists(FileSystem fileSys, Path path) throws IOException {
     return fileSys.exists(path);
@@ -909,9 +911,9 @@ public class JobHistoryEventHandler exte
     return tmpFileName.substring(0, tmpFileName.length()-4);
   }
 
-  public void setSignalled(boolean isSignalled) {
-    this.isSignalled = isSignalled;
-    LOG.info("JobHistoryEventHandler notified that isSignalled was "
-      + isSignalled);
+  public void setForcejobCompletion(boolean forceJobCompletion) {
+    this.forceJobCompletion = forceJobCompletion;
+    LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+      + forceJobCompletion);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Oct 19 02:25:55 2012
@@ -87,8 +87,6 @@ import org.apache.hadoop.mapreduce.v2.ut
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.Clock;
@@ -172,6 +170,8 @@ public class MRAppMaster extends Composi
   private Credentials fsTokens = new Credentials(); // Filled during init
   private UserGroupInformation currentUser; // Will be setup during init
 
+  private volatile boolean isLastAMRetry = false;
+
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       long appSubmitTime) {
@@ -197,11 +197,21 @@ public class MRAppMaster extends Composi
 
   @Override
   public void init(final Configuration conf) {
-
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     downloadTokensAndSetupUGI(conf);
-
+    
+    //TODO this is a hack, we really need the RM to inform us when we
+    // are the last one.  This would allow us to configure retries on
+    // a per application basis.
+    int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, 
+        YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
+    isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries;
+    LOG.info("AM Retries: " + numAMRetries + 
+        " attempt num: " + appAttemptID.getAttemptId() +
+        " is last retry: " + isLastAMRetry);
+    
+    
     context = new RunningAppContext(conf);
 
     // Job name is the same as the app name util we support DAG of jobs
@@ -419,6 +429,8 @@ public class MRAppMaster extends Composi
       }
 
       try {
+        //We are finishing cleanly so this is the last retry
+        isLastAMRetry = true;
         // Stop all services
         // This will also send the final report to the ResourceManager
         LOG.info("Calling stop for all the services");
@@ -478,27 +490,17 @@ public class MRAppMaster extends Composi
     try {
       this.currentUser = UserGroupInformation.getCurrentUser();
 
-      if (UserGroupInformation.isSecurityEnabled()) {
-        // Read the file-system tokens from the localized tokens-file.
-        Path jobSubmitDir = 
-            FileContext.getLocalFSFileContext().makeQualified(
-                new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
-                    .getAbsolutePath()));
-        Path jobTokenFile = 
-            new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
-        fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
-        LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
-            + jobTokenFile);
-
-        for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Token of kind " + tk.getKind()
-                + "in current ugi in the AppMaster for service "
-                + tk.getService());
-          }
-          currentUser.addToken(tk); // For use by AppMaster itself.
-        }
-      }
+      // Read the file-system tokens from the localized tokens-file.
+      Path jobSubmitDir = 
+          FileContext.getLocalFSFileContext().makeQualified(
+              new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
+                  .getAbsolutePath()));
+      Path jobTokenFile = 
+          new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+      fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
+      LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+          + jobTokenFile);
+      currentUser.addCredentials(fsTokens); // For use by AppMaster itself.
     } catch (IOException e) {
       throw new YarnException(e);
     }
@@ -676,7 +678,11 @@ public class MRAppMaster extends Composi
     }
 
     public void setSignalled(boolean isSignalled) {
-      ((RMCommunicator) containerAllocator).setSignalled(true);
+      ((RMCommunicator) containerAllocator).setSignalled(isSignalled);
+    }
+    
+    public void setShouldUnregister(boolean shouldUnregister) {
+      ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
     }
   }
 
@@ -727,7 +733,12 @@ public class MRAppMaster extends Composi
     @Override
     public synchronized void stop() {
       try {
-        cleanupStagingDir();
+        if(isLastAMRetry) {
+          cleanupStagingDir();
+        } else {
+          LOG.info("Skipping cleaning up the staging dir. "
+              + "assuming AM will be retried.");
+        }
       } catch (IOException io) {
         LOG.error("Failed to cleanup staging dir: ", io);
       }
@@ -1026,14 +1037,19 @@ public class MRAppMaster extends Composi
     public void run() {
       LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
         + "JobHistoryEventHandler.");
+
       // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
       // that they don't take too long in shutting down
       if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
         ((ContainerAllocatorRouter) appMaster.containerAllocator)
         .setSignalled(true);
+        ((ContainerAllocatorRouter) appMaster.containerAllocator)
+        .setShouldUnregister(appMaster.isLastAMRetry);
       }
+      
       if(appMaster.jobHistoryEventHandler != null) {
-        appMaster.jobHistoryEventHandler.setSignalled(true);
+        appMaster.jobHistoryEventHandler
+          .setForcejobCompletion(appMaster.isLastAMRetry);
       }
       appMaster.stop();
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Fri Oct 19 02:25:55 2012
@@ -82,8 +82,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
@@ -115,16 +114,15 @@ public class MRClientService extends Abs
     YarnRPC rpc = YarnRPC.create(conf);
     InetSocketAddress address = new InetSocketAddress(0);
 
-    ClientToAMSecretManager secretManager = null;
+    ClientToAMTokenSecretManager secretManager = null;
     if (UserGroupInformation.isSecurityEnabled()) {
-      secretManager = new ClientToAMSecretManager();
       String secretKeyStr =
           System
               .getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
       byte[] bytes = Base64.decodeBase64(secretKeyStr);
-      ClientTokenIdentifier identifier = new ClientTokenIdentifier(
-          this.appContext.getApplicationID());
-      secretManager.setMasterKey(identifier, bytes);
+      secretManager =
+          new ClientToAMTokenSecretManager(this.appContext.getApplicationID(),
+            bytes);
     }
     server =
         rpc.getServer(MRClientProtocol.class, protocolHandler, address,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Oct 19 02:25:55 2012
@@ -582,17 +582,23 @@ public class JobImpl implements org.apac
       String jobFile =
           remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
 
+      StringBuilder diagsb = new StringBuilder();
+      for (String s : getDiagnostics()) {
+        diagsb.append(s).append("\n");
+      }
+
       if (getState() == JobState.NEW) {
         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
-            cleanupProgress, jobFile, amInfos, isUber);
+            cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
       }
 
       computeProgress();
-      return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
-          appSubmitTime, startTime, finishTime, setupProgress,
+      JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, username,
+          state, appSubmitTime, startTime, finishTime, setupProgress,
           this.mapProgress, this.reduceProgress,
-          cleanupProgress, jobFile, amInfos, isUber);
+          cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
+      return report;
     } finally {
       readLock.unlock();
     }
@@ -759,7 +765,8 @@ public class JobImpl implements org.apac
         job.getCommitter().commitJob(job.getJobContext());
       } catch (IOException e) {
         LOG.error("Could not do commit for Job", e);
-        job.logJobHistoryFinishedEvent();
+        job.addDiagnostic("Job commit failed: " + e.getMessage());
+        job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         return job.finished(JobState.FAILED);
       }
       job.logJobHistoryFinishedEvent();
@@ -1199,7 +1206,7 @@ public class JobImpl implements org.apac
     }
   }
 
-  private void abortJob(
+  protected void abortJob(
       org.apache.hadoop.mapreduce.JobStatus.State finalState) {
     try {
       committer.abortJob(jobContext, finalState);
@@ -1370,7 +1377,8 @@ public class JobImpl implements org.apac
           }
         }
         
-        float failureRate = (float) fetchFailures / runningReduceTasks;
+        float failureRate = runningReduceTasks == 0 ? 1.0f : 
+          (float) fetchFailures / runningReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
         boolean isMapFaulty =
             (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
@@ -1500,7 +1508,7 @@ public class JobImpl implements org.apac
     }
   }
 
-  private void addDiagnostic(String diag) {
+  protected void addDiagnostic(String diag) {
     diagnostics.add(diag);
   }
   
@@ -1561,7 +1569,7 @@ public class JobImpl implements org.apac
     Path confPath = getConfFile();
     FileContext fc = FileContext.getFileContext(confPath.toUri(), conf);
     Configuration jobConf = new Configuration(false);
-    jobConf.addResource(fc.open(confPath));
+    jobConf.addResource(fc.open(confPath), confPath.toString());
     return jobConf;
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Oct 19 02:25:55 2012
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.MapReduceChildJVM;
 import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapred.Task;
@@ -71,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -86,6 +88,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@@ -120,6 +123,7 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -128,6 +132,8 @@ import org.apache.hadoop.yarn.util.Build
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Implementation of TaskAttempt interface.
  */
@@ -404,10 +410,10 @@ public abstract class TaskAttemptImpl im
          TaskAttemptState.FAILED,
          TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
          new TooManyFetchFailureTransition())
-     .addTransition(
-         TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED,
-         TaskAttemptEventType.TA_KILL,
-         new KilledAfterSuccessTransition())
+      .addTransition(TaskAttemptState.SUCCEEDED,
+          EnumSet.of(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED),
+          TaskAttemptEventType.TA_KILL, 
+          new KilledAfterSuccessTransition())
      .addTransition(
          TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
@@ -435,7 +441,8 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
-             TaskAttemptEventType.TA_FAILMSG))
+             TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE))
 
      // Transitions from KILLED state
      .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
@@ -604,10 +611,12 @@ public abstract class TaskAttemptImpl im
       if (jobJar != null) {
         Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
             .getUri(), remoteFS.getWorkingDirectory());
-        localResources.put(
-            MRJobConfig.JOB_JAR,
-            createLocalResource(remoteFS, remoteJobJar,
-                LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+        LocalResource rc = createLocalResource(remoteFS, remoteJobJar,
+            LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
+        String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
+            JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+        rc.setPattern(pattern);
+        localResources.put(MRJobConfig.JOB_JAR, rc);
         LOG.info("The job-jar file on the remote FS is "
             + remoteJobJar.toUri().toASCIIString());
       } else {
@@ -638,14 +647,10 @@ public abstract class TaskAttemptImpl im
       MRApps.setupDistributedCache(conf, localResources);
 
       // Setup up task credentials buffer
-      Credentials taskCredentials = new Credentials();
-
-      if (UserGroupInformation.isSecurityEnabled()) {
-        LOG.info("Adding #" + credentials.numberOfTokens()
-            + " tokens and #" + credentials.numberOfSecretKeys()
-            + " secret keys for NM use for launching container");
-        taskCredentials.addAll(credentials);
-      }
+      LOG.info("Adding #" + credentials.numberOfTokens()
+          + " tokens and #" + credentials.numberOfSecretKeys()
+          + " secret keys for NM use for launching container");
+      Credentials taskCredentials = new Credentials(credentials);
 
       // LocalStorageToken is needed irrespective of whether security is enabled
       // or not.
@@ -1482,6 +1487,9 @@ public abstract class TaskAttemptImpl im
     @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+      // too many fetch failure can only happen for map tasks
+      Preconditions
+          .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
       //add to diagnostic
       taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
       //set the finish time
@@ -1505,15 +1513,30 @@ public abstract class TaskAttemptImpl im
   }
   
   private static class KilledAfterSuccessTransition implements
-      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+      MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptState> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void transition(TaskAttemptImpl taskAttempt, 
+    public TaskAttemptState transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
-      TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
-      //add to diagnostic
-      taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
+      if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
+        // after a reduce task has succeeded, its outputs are in safe in HDFS.
+        // logically such a task should not be killed. we only come here when
+        // there is a race condition in the event queue. E.g. some logic sends
+        // a kill request to this attempt when the successful completion event
+        // for this task is already in the event queue. so the kill event will
+        // get executed immediately after the attempt is marked successful and 
+        // result in this transition being exercised.
+        // ignore this for reduce tasks
+        LOG.info("Ignoring killed event for successful reduce task attempt" +
+                  taskAttempt.getID().toString());
+        return TaskAttemptState.SUCCEEDED;
+      }
+      if(event instanceof TaskAttemptKillEvent) {
+        TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
+        //add to diagnostic
+        taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
+      }
 
       // not setting a finish time since it was set on success
       assert (taskAttempt.getFinishTime() != 0);
@@ -1527,6 +1550,7 @@ public abstract class TaskAttemptImpl im
           .getTaskId().getJobId(), tauce));
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
+      return TaskAttemptState.KILLED;
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Oct 19 02:25:55 2012
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -108,7 +110,8 @@ public abstract class TaskImpl implement
   private long scheduledTime;
   
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  
+
+  protected boolean encryptedShuffle;
   protected Credentials credentials;
   protected Token<JobTokenIdentifier> jobToken;
   
@@ -188,12 +191,12 @@ public abstract class TaskImpl implement
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
-    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+    .addTransition(TaskState.SUCCEEDED,
         EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
-        TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
-    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+        TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition())
+    .addTransition(TaskState.SUCCEEDED,
         EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
-        TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
+        TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskState.SUCCEEDED, TaskState.SUCCEEDED,
@@ -274,6 +277,8 @@ public abstract class TaskImpl implement
     this.jobToken = jobToken;
     this.metrics = metrics;
     this.appContext = appContext;
+    this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+                                            MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
 
     // See if this is from a previous generation.
     if (completedTasksFromPreviousRun != null
@@ -637,9 +642,10 @@ public abstract class TaskImpl implement
       TaskAttemptCompletionEvent tce = recordFactory
           .newRecordInstance(TaskAttemptCompletionEvent.class);
       tce.setEventId(-1);
-      tce.setMapOutputServerAddress("http://"
-          + attempt.getNodeHttpAddress().split(":")[0] + ":"
-          + attempt.getShufflePort());
+      String scheme = (encryptedShuffle) ? "https://" : "http://";
+      tce.setMapOutputServerAddress(scheme
+         + attempt.getNodeHttpAddress().split(":")[0] + ":"
+         + attempt.getShufflePort());
       tce.setStatus(status);
       tce.setAttemptId(attempt.getID());
       int runTime = 0;
@@ -891,7 +897,7 @@ public abstract class TaskImpl implement
     }
   }
 
-  private static class MapRetroactiveFailureTransition
+  private static class RetroactiveFailureTransition
       extends AttemptFailedTransition {
 
     @Override
@@ -905,8 +911,8 @@ public abstract class TaskImpl implement
           return TaskState.SUCCEEDED;
         }
       }
-      
-      //verify that this occurs only for map task
+
+      // a successful REDUCE task should not be overridden
       //TODO: consider moving it to MapTaskImpl
       if (!TaskType.MAP.equals(task.getType())) {
         LOG.error("Unexpected event for REDUCE task " + event.getType());
@@ -932,42 +938,46 @@ public abstract class TaskImpl implement
     }
   }
 
-  private static class MapRetroactiveKilledTransition implements
+  private static class RetroactiveKilledTransition implements
     MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
 
     @Override
     public TaskState transition(TaskImpl task, TaskEvent event) {
-      // verify that this occurs only for map task
+      TaskAttemptId attemptId = null;
+      if (event instanceof TaskTAttemptEvent) {
+        TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+        attemptId = castEvent.getTaskAttemptID(); 
+        if (task.getState() == TaskState.SUCCEEDED &&
+            !attemptId.equals(task.successfulAttempt)) {
+          // don't allow a different task attempt to override a previous
+          // succeeded state
+          return TaskState.SUCCEEDED;
+        }
+      }
+
+      // a successful REDUCE task should not be overridden
       // TODO: consider moving it to MapTaskImpl
       if (!TaskType.MAP.equals(task.getType())) {
         LOG.error("Unexpected event for REDUCE task " + event.getType());
         task.internalError(event.getType());
       }
 
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      TaskAttemptId attemptId = attemptEvent.getTaskAttemptID();
-      if(task.successfulAttempt == attemptId) {
-        // successful attempt is now killed. reschedule
-        // tell the job about the rescheduling
-        unSucceed(task);
-        task.handleTaskAttemptCompletion(
-            attemptId, 
-            TaskAttemptCompletionEventStatus.KILLED);
-        task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
-        // typically we are here because this map task was run on a bad node and 
-        // we want to reschedule it on a different node.
-        // Depending on whether there are previous failed attempts or not this 
-        // can SCHEDULE or RESCHEDULE the container allocate request. If this
-        // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
-        // from the map splitInfo. So the bad node might be sent as a location 
-        // to the RM. But the RM would ignore that just like it would ignore 
-        // currently pending container requests affinitized to bad nodes.
-        task.addAndScheduleAttempt();
-        return TaskState.SCHEDULED;
-      } else {
-        // nothing to do
-        return TaskState.SUCCEEDED;
-      }
+      // successful attempt is now killed. reschedule
+      // tell the job about the rescheduling
+      unSucceed(task);
+      task.handleTaskAttemptCompletion(attemptId,
+          TaskAttemptCompletionEventStatus.KILLED);
+      task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
+      // typically we are here because this map task was run on a bad node and
+      // we want to reschedule it on a different node.
+      // Depending on whether there are previous failed attempts or not this
+      // can SCHEDULE or RESCHEDULE the container allocate request. If this
+      // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
+      // from the map splitInfo. So the bad node might be sent as a location
+      // to the RM. But the RM would ignore that just like it would ignore
+      // currently pending container requests affinitized to bad nodes.
+      task.addAndScheduleAttempt();
+      return TaskState.SCHEDULED;
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Oct 19 02:25:55 2012
@@ -84,6 +84,7 @@ public abstract class RMCommunicator ext
   private Job job;
   // Has a signal (SIGTERM etc) been issued?
   protected volatile boolean isSignalled = false;
+  private volatile boolean shouldUnregister = true;
 
   public RMCommunicator(ClientService clientService, AppContext context) {
     super("RMCommunicator");
@@ -213,7 +214,9 @@ public abstract class RMCommunicator ext
     } catch (InterruptedException ie) {
       LOG.warn("InterruptedException while stopping", ie);
     }
-    unregister();
+    if(shouldUnregister) {
+      unregister();
+    }
     super.stop();
   }
 
@@ -288,8 +291,15 @@ public abstract class RMCommunicator ext
 
   protected abstract void heartbeat() throws Exception;
 
+  public void setShouldUnregister(boolean shouldUnregister) {
+    this.shouldUnregister = shouldUnregister;
+    LOG.info("RMCommunicator notified that shouldUnregistered is: " 
+        + shouldUnregister);
+  }
+  
   public void setSignalled(boolean isSignalled) {
     this.isSignalled = isSignalled;
-    LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+    LOG.info("RMCommunicator notified that iSignalled is: " 
+        + isSignalled);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Oct 19 02:25:55 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,9 +46,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -131,6 +127,7 @@ public class RMContainerAllocator extend
   private int containersReleased = 0;
   private int hostLocalAssigned = 0;
   private int rackLocalAssigned = 0;
+  private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
   private int mapResourceReqt;//memory
@@ -214,11 +211,18 @@ public class RMContainerAllocator extend
       scheduledRequests.assign(allocatedContainers);
       LOG.info("After Assign: " + getStat());
     }
-    
+
+    int completedMaps = getJob().getCompletedMaps();
+    int completedTasks = completedMaps + getJob().getCompletedReduces();
+    if (lastCompletedTasks != completedTasks) {
+      lastCompletedTasks = completedTasks;
+      recalculateReduceSchedule = true;
+    }
+
     if (recalculateReduceSchedule) {
       preemptReducesIfNeeded();
       scheduleReduces(
-          getJob().getTotalMaps(), getJob().getCompletedMaps(),
+          getJob().getTotalMaps(), completedMaps,
           scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
           assignedRequests.maps.size(), assignedRequests.reduces.size(),
           mapResourceReqt, reduceResourceReqt,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/ConfBlock.java Fri Oct 19 02:25:55 2012
@@ -78,14 +78,29 @@ public class ConfBlock extends HtmlBlock
           tr().
             th(_TH, "key").
             th(_TH, "value").
+            th(_TH, "source chain").
           _().
         _().
       tbody();
       for (ConfEntryInfo entry : info.getProperties()) {
+        StringBuffer buffer = new StringBuffer();
+        String[] sources = entry.getSource();
+        //Skip the last entry, because it is always the same HDFS file, and
+        // output them in reverse order so most recent is output first
+        boolean first = true;
+        for(int i = (sources.length  - 2); i >= 0; i--) {
+          if(!first) {
+            // \u2B05 is an arrow <--
+            buffer.append(" \u2B05 ");
+          }
+          first = false;
+          buffer.append(sources[i]);
+        }
         tbody.
           tr().
             td(entry.getName()).
             td(entry.getValue()).
+            td(buffer.toString()).
           _();
       }
       tbody._().
@@ -93,6 +108,7 @@ public class ConfBlock extends HtmlBlock
         tr().
           th().input("search_init").$type(InputType.text).$name("key").$value("key")._()._().
           th().input("search_init").$type(InputType.text).$name("value").$value("value")._()._().
+          th().input("search_init").$type(InputType.text).$name("source chain").$value("source chain")._()._().
           _().
         _().
       _();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java Fri Oct 19 02:25:55 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
 import static org.apache.hadoop.yarn.util.StringHelper.join;
-import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._EVEN;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._ODD;
@@ -31,6 +30,7 @@ import static org.apache.hadoop.yarn.web
 import java.util.Date;
 import java.util.List;
 
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -40,8 +40,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@@ -106,7 +104,8 @@ public class JobBlock extends HtmlBlock 
       table.tr().
         td(String.valueOf(attempt.getAttemptId())).
         td(new Date(attempt.getStartTime()).toString()).
-        td().a(".nodelink", url("http://", attempt.getNodeHttpAddress()), 
+        td().a(".nodelink", url(HttpConfig.getSchemePrefix(),
+            attempt.getNodeHttpAddress()),
             attempt.getNodeHttpAddress())._().
         td().a(".logslink", url(attempt.getLogsLink()), 
             "logs")._().

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java Fri Oct 19 02:25:55 2012
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
 
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
 
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
@@ -62,7 +63,8 @@ public class NavBlock extends HtmlBlock 
           li().a(url("conf", jobid), "Configuration")._().
           li().a(url("tasks", jobid, "m"), "Map tasks")._().
           li().a(url("tasks", jobid, "r"), "Reduce tasks")._().
-          li().a(".logslink", url("http://", nodeHttpAddress, "node",
+          li().a(".logslink", url(HttpConfig.getSchemePrefix(),
+              nodeHttpAddress, "node",
               "containerlogs", thisAmInfo.getContainerId().toString(), 
               app.getJob().getUserName()), 
               "AM Logs")._()._();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Fri Oct 19 02:25:55 2012
@@ -27,6 +27,7 @@ import static org.apache.hadoop.yarn.web
 
 import java.util.Collection;
 
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
 import org.apache.hadoop.util.StringUtils;
@@ -93,13 +94,15 @@ public class TaskPage extends AppView {
           nodeTd._("N/A");
         } else {
           nodeTd.
-            a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
+            a(".nodelink", url(HttpConfig.getSchemePrefix(),
+                               nodeHttpAddr), nodeHttpAddr);
         }
         nodeTd._();
         if (containerId != null) {
           String containerIdStr = ta.getAssignedContainerIdStr();
           row.td().
-            a(".logslink", url("http://", nodeHttpAddr, "node", "containerlogs",
+              a(".logslink", url(HttpConfig.getSchemePrefix(),
+              nodeHttpAddr, "node", "containerlogs",
               containerIdStr, app.getJob().getUserName()), "logs")._();
         } else {
           row.td()._("N/A")._();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java Fri Oct 19 02:25:55 2012
@@ -24,6 +24,7 @@ import javax.xml.bind.annotation.XmlAcce
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -63,8 +64,8 @@ public class AMAttemptInfo {
     ContainerId containerId = amInfo.getContainerId();
     if (containerId != null) {
       this.containerId = containerId.toString();
-      this.logsLink = join("http://" + nodeHttpAddress,
-          ujoin("node", "containerlogs", this.containerId));
+      this.logsLink = join(HttpConfig.getSchemePrefix() + nodeHttpAddress,
+          ujoin("node", "containerlogs", this.containerId, user));
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfEntryInfo.java Fri Oct 19 02:25:55 2012
@@ -27,13 +27,19 @@ public class ConfEntryInfo {
 
   protected String name;
   protected String value;
+  protected String[] source;
 
   public ConfEntryInfo() {
   }
 
   public ConfEntryInfo(String key, String value) {
+    this(key, value, null);
+  }
+  
+  public ConfEntryInfo(String key, String value, String[] source) {
     this.name = key;
     this.value = value;
+    this.source = source;
   }
 
   public String getName() {
@@ -43,4 +49,8 @@ public class ConfEntryInfo {
   public String getValue() {
     return this.value;
   }
+  
+  public String[] getSource() {
+    return source;
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/ConfInfo.java Fri Oct 19 02:25:55 2012
@@ -46,7 +46,8 @@ public class ConfInfo {
     Configuration jobConf = job.loadConfFile();
     this.path = job.getConfFile().toString();
     for (Map.Entry<String, String> entry : jobConf) {
-      this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue()));
+      this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue(), 
+          jobConf.getPropertySources(entry.getKey())));
     }
 
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Fri Oct 19 02:25:55 2012
@@ -330,7 +330,7 @@ public class TestJobHistoryEventHandler 
     Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
 
     jheh.addToFileMap(jobId);
-    jheh.setSignalled(true);
+    jheh.setForcejobCompletion(true);
     for(int i=0; i < numEvents; ++i) {
       events[i] = getEventToEnqueue(jobId);
       jheh.handle(events[i]);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Oct 19 02:25:55 2012
@@ -603,7 +603,7 @@ public class MockJobs extends MockApps {
       public Configuration loadConfFile() throws IOException {
         FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
         Configuration jobConf = new Configuration(false);
-        jobConf.addResource(fc.open(configFile));
+        jobConf.addResource(fc.open(configFile), configFile.toString());
         return jobConf;
       }
     };

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Oct 19 02:25:55 2012
@@ -180,7 +180,7 @@ public class TestMRApp {
   @Test
   public void testUpdatedNodes() throws Exception {
     int runCount = 0;
-    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+    MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(),
         true, ++runCount);
     Configuration conf = new Configuration();
     // after half of the map completion, reduce will start
@@ -189,7 +189,7 @@ public class TestMRApp {
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
-    Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
+    Assert.assertEquals("Num tasks not correct", 4, job.getTasks().size());
     Iterator<Task> it = job.getTasks().values().iterator();
     Task mapTask1 = it.next();
     Task mapTask2 = it.next();
@@ -272,18 +272,19 @@ public class TestMRApp {
 
     // rerun
     // in rerun the 1st map will be recovered from previous run
-    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+    app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), false,
         ++runCount);
     conf = new Configuration();
     conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
-    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    Assert.assertEquals("No of tasks not correct", 4, job.getTasks().size());
     it = job.getTasks().values().iterator();
     mapTask1 = it.next();
     mapTask2 = it.next();
-    Task reduceTask = it.next();
+    Task reduceTask1 = it.next();
+    Task reduceTask2 = it.next();
 
     // map 1 will be recovered, no need to send done
     app.waitForState(mapTask1, TaskState.SUCCEEDED);
@@ -306,19 +307,36 @@ public class TestMRApp {
     Assert.assertEquals("Expecting 1 more completion events for success", 3,
         events.length);
 
-    app.waitForState(reduceTask, TaskState.RUNNING);
-    TaskAttempt task3Attempt = reduceTask.getAttempts().values().iterator()
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+
+    TaskAttempt task3Attempt = reduceTask1.getAttempts().values().iterator()
         .next();
     app.getContext()
         .getEventHandler()
         .handle(
             new TaskAttemptEvent(task3Attempt.getID(),
                 TaskAttemptEventType.TA_DONE));
-    app.waitForState(reduceTask, TaskState.SUCCEEDED);
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    app.getContext()
+    .getEventHandler()
+    .handle(
+        new TaskAttemptEvent(task3Attempt.getID(),
+            TaskAttemptEventType.TA_KILL));
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    TaskAttempt task4Attempt = reduceTask2.getAttempts().values().iterator()
+        .next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task4Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.waitForState(reduceTask2, TaskState.SUCCEEDED);    
 
     events = job.getTaskAttemptCompletionEvents(0, 100);
-    Assert.assertEquals("Expecting 1 more completion events for success", 4,
-        events.length);
+    Assert.assertEquals("Expecting 2 more completion events for reduce success",
+        5, events.length);
 
     // job succeeds
     app.waitForState(job, JobState.SUCCEEDED);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Oct 19 02:25:55 2012
@@ -93,7 +93,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -139,7 +138,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -216,7 +215,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -282,7 +281,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -724,7 +723,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -828,7 +827,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator =
         new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
 
@@ -994,7 +993,7 @@ public class TestRMContainerAllocator {
     Job mockJob = mock(Job.class);
     when(mockJob.getReport()).thenReturn(
         MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
-            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob);
 
@@ -1099,8 +1098,7 @@ public class TestRMContainerAllocator {
       super();
       try {
         Configuration conf = new Configuration();
-        reinitialize(conf, new ContainerTokenSecretManager(conf),
-            rmContext);
+        reinitialize(conf, rmContext);
       } catch (IOException ie) {
         LOG.info("add application failed with ", ie);
         assert (false);
@@ -1409,7 +1407,63 @@ public class TestRMContainerAllocator {
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
   }
+
+  private static class RecalculateContainerAllocator extends MyContainerAllocator {
+    public boolean recalculatedReduceSchedule = false;
+
+    public RecalculateContainerAllocator(MyResourceManager rm,
+        Configuration conf, ApplicationAttemptId appAttemptId, Job job) {
+      super(rm, conf, appAttemptId, job);
+    }
+
+    @Override
+    public void scheduleReduces(int totalMaps, int completedMaps,
+        int scheduledMaps, int scheduledReduces, int assignedMaps,
+        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
+      recalculatedReduceSchedule = true;
+    }
+  }
   
+  @Test
+  public void testCompletedTasksRecalculateSchedule() throws Exception {
+    LOG.info("Running testCompletedTasksRecalculateSchedule");
+
+    Configuration conf = new Configuration();
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job job = mock(Job.class);
+    when(job.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    doReturn(10).when(job).getTotalMaps();
+    doReturn(10).when(job).getTotalReduces();
+    doReturn(0).when(job).getCompletedMaps();
+    RecalculateContainerAllocator allocator =
+        new RecalculateContainerAllocator(rm, conf, appAttemptId, job);
+    allocator.schedule();
+
+    allocator.recalculatedReduceSchedule = false;
+    allocator.schedule();
+    Assert.assertFalse("Unexpected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+
+    doReturn(1).when(job).getCompletedMaps();
+    allocator.schedule();
+    Assert.assertTrue("Expected recalculate of reduce schedule",
+        allocator.recalculatedReduceSchedule);
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();
@@ -1418,6 +1472,7 @@ public class TestRMContainerAllocator {
     t.testReportedAppProgress();
     t.testReportedAppProgressWithOnlyMaps();
     t.testBlackListedNodes();
+    t.testCompletedTasksRecalculateSchedule();
   }
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Oct 19 02:25:55 2012
@@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBo
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 
@@ -47,6 +48,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -89,28 +91,98 @@ import org.junit.Test;
      handler.handle(new JobFinishEvent(jobid));
      verify(fs).delete(stagingJobPath, true);
    }
+   
+   @Test
+   public void testDeletionofStagingOnKill() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(0);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     appMaster.init(conf);
+     //simulate the process being killed
+     MRAppMaster.MRAppMasterShutdownHook hook = 
+       new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+     hook.run();
+     verify(fs, times(0)).delete(stagingJobPath, true);
+   }
+   
+   @Test
+   public void testDeletionofStagingOnKillLastTry() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(1);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     JobId jobid = recordFactory.newRecordInstance(JobId.class);
+     jobid.setAppId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     appMaster.init(conf);
+     //simulate the process being killed
+     MRAppMaster.MRAppMasterShutdownHook hook = 
+       new MRAppMaster.MRAppMasterShutdownHook(appMaster);
+     hook.run();
+     verify(fs).delete(stagingJobPath, true);
+   }
 
    private class TestMRApp extends MRAppMaster {
+     ContainerAllocator allocator;
 
-    public TestMRApp(ApplicationAttemptId applicationAttemptId) {
-      super(applicationAttemptId, BuilderUtils.newContainerId(
-          applicationAttemptId, 1), "testhost", 2222, 3333, System
-          .currentTimeMillis());
-    }
-     
-    @Override
-    protected FileSystem getFileSystem(Configuration conf) {
-      return fs;
-    }
-    
-    @Override
-    protected void sysexit() {      
-    }
-    
-    @Override
-    public Configuration getConfig() {
-      return conf;
-    }
+     public TestMRApp(ApplicationAttemptId applicationAttemptId, 
+         ContainerAllocator allocator) {
+       super(applicationAttemptId, BuilderUtils.newContainerId(
+           applicationAttemptId, 1), "testhost", 2222, 3333, System
+           .currentTimeMillis());
+       this.allocator = allocator;
+     }
+
+     public TestMRApp(ApplicationAttemptId applicationAttemptId) {
+       this(applicationAttemptId, null);
+     }
+
+     @Override
+     protected FileSystem getFileSystem(Configuration conf) {
+       return fs;
+     }
+
+     @Override
+     protected ContainerAllocator createContainerAllocator(
+         final ClientService clientService, final AppContext context) {
+       if(allocator == null) {
+         return super.createContainerAllocator(clientService, context);
+       }
+       return allocator;
+     }
+
+     @Override
+     protected void sysexit() {      
+     }
+
+     @Override
+     public Configuration getConfig() {
+       return conf;
+     }
+
+     @Override
+     protected void downloadTokensAndSetupUGI(Configuration conf) {
+     }
    }
 
   private final class MRAppTestCleanup extends MRApp {
@@ -198,4 +270,4 @@ import org.junit.Test;
     Assert.assertTrue("Staging directory not cleaned before notifying RM",
         app.cleanedBeforeContainerAllocatorStopped);
   }
- }
\ No newline at end of file
+ }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Oct 19 02:25:55 2012
@@ -19,9 +19,11 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -43,11 +45,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Records;
@@ -91,8 +96,6 @@ public class TestJobImpl {
     when(mockJob.getCommitter()).thenReturn(mockCommitter);
     when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
     when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    doNothing().when(mockJob).setFinishTime();
-    doNothing().when(mockJob).logJobHistoryFinishedEvent();
     when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED);
     when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED);
     when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED);
@@ -103,11 +106,13 @@ public class TestJobImpl {
       // commitJob stubbed out, so this can't happen
     }
     doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    JobState jobState = JobImpl.checkJobCompleteSuccess(mockJob);
     Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
-      "for successful job",
-      JobImpl.checkJobCompleteSuccess(mockJob));
+      "for successful job", jobState);
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobState.FAILED, JobImpl.checkJobCompleteSuccess(mockJob));
+        JobState.FAILED, jobState);
+    verify(mockJob).abortJob(
+        eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
   }
 
   @Test
@@ -170,6 +175,8 @@ public class TestJobImpl {
     t.testCheckJobCompleteSuccess();
     t.testCheckJobCompleteSuccessFailed();
     t.testCheckAccess();
+    t.testReportDiagnostics();
+    t.testUberDecision();
   }
 
   @Test
@@ -239,6 +246,41 @@ public class TestJobImpl {
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
+
+  @Test
+  public void testReportDiagnostics() throws Exception {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    final String diagMsg = "some diagnostic message";
+    final JobDiagnosticsUpdateEvent diagUpdateEvent =
+        new JobDiagnosticsUpdateEvent(jobId, diagMsg);
+    MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+    JobImpl job = new JobImpl(jobId, Records
+        .newRecord(ApplicationAttemptId.class), new Configuration(),
+        mock(EventHandler.class),
+        null, mock(JobTokenSecretManager.class), null,
+        new SystemClock(), null,
+        mrAppMetrics, mock(OutputCommitter.class),
+        true, null, 0, null, null);
+    job.handle(diagUpdateEvent);
+    String diagnostics = job.getReport().getDiagnostics();
+    Assert.assertNotNull(diagnostics);
+    Assert.assertTrue(diagnostics.contains(diagMsg));
+
+    job = new JobImpl(jobId, Records
+        .newRecord(ApplicationAttemptId.class), new Configuration(),
+        mock(EventHandler.class),
+        null, mock(JobTokenSecretManager.class), null,
+        new SystemClock(), null,
+        mrAppMetrics, mock(OutputCommitter.class),
+        true, null, 0, null, null);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    job.handle(diagUpdateEvent);
+    diagnostics = job.getReport().getDiagnostics();
+    Assert.assertNotNull(diagnostics);
+    Assert.assertTrue(diagnostics.contains(diagMsg));
+  }
+
   @Test
   public void testUberDecision() throws Exception {