You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [25/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Jun 21 06:37:27 2013
@@ -44,17 +44,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.Counters.CountersExceededException;
-import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -71,7 +67,7 @@ import org.apache.hadoop.util.Shell;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
- * a Job on the straight and narrow.  It keeps its JobProfile
+ * a Job on the straight and narrow. It keeps its JobProfile
  * and its latest JobStatus, plus a set of tables for 
  * doing bookkeeping of its Tasks.
  * ***********************************************************
@@ -256,7 +252,6 @@ public class JobInProgress {
   private String submitHostAddress;
   private String user;
   private String historyFile = "";
-  private boolean historyFileCopied;
   
   // Per-job counters
   public static enum Counter { 
@@ -266,6 +261,7 @@ public class JobInProgress {
     TOTAL_LAUNCHED_REDUCES,
     OTHER_LOCAL_MAPS,
     DATA_LOCAL_MAPS,
+    NODEGROUP_LOCAL_MAPS,
     RACK_LOCAL_MAPS,
     SLOTS_MILLIS_MAPS,
     SLOTS_MILLIS_REDUCES,
@@ -520,7 +516,7 @@ public class JobInProgress {
   public void cleanUpMetrics() {
     // per job metrics is disabled for now.
   }
-    
+
   private void printCache (Map<Node, List<TaskInProgress>> cache) {
     LOG.info("The taskcache info:");
     for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
@@ -656,12 +652,44 @@ public class JobInProgress {
     this.numSlotsPerReduce = numSlotsPerReduce;
   }
 
+  static final String JOB_INIT_EXCEPTION = 
+      "mapreduce.job.init.throw.exception";
+  static final String JT_JOB_INIT_EXCEPTION_OVERRIDE = 
+      "mapreduce.jt.job.init.throw.exception.override";
+  
+  Object jobInitWaitLockForTests = new Object();
+  
+  void signalInitWaitLockForTests() {
+    synchronized (jobInitWaitLockForTests) {
+      jobInitWaitLockForTests.notify();
+    }
+  }
+  
+  void waitForInitWaitLockForTests() {
+    synchronized (jobInitWaitLockForTests) {
+      try {
+        LOG.info("About to wait for jobInitWaitLockForTests");
+        jobInitWaitLockForTests.wait();
+        LOG.info("Done waiting for jobInitWaitLockForTests");
+      } catch (InterruptedException ie) {
+        // Should never occur
+      }
+    }
+  }
+  
   /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    */
   public synchronized void initTasks() 
   throws IOException, KillInterruptedException, UnknownHostException {
+    // Only for tests
+    if (!jobtracker.getConf().getBoolean(JT_JOB_INIT_EXCEPTION_OVERRIDE, false) 
+        &&
+        getJobConf().getBoolean(JOB_INIT_EXCEPTION, false)) {
+        waitForInitWaitLockForTests();
+    }
+    
     if (tasksInited || isComplete()) {
       return;
     }
@@ -692,18 +720,13 @@ public class JobInProgress {
     setPriority(this.priority);
     
     //
-    // generate security keys needed by Tasks
-    //
-    generateAndStoreTokens();
-    
-    //
     // read input splits and create a map per a split
     //
     TaskSplitMetaInfo[] splits = createSplits(jobId);
     if (numMapTasks != splits.length) {
       throw new IOException("Number of maps in JobConf doesn't match number of " +
-      		"recieved splits for job " + jobId + "! " +
-      		"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
+          "recieved splits for job " + jobId + "! " +
+          "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
     }
     numMapTasks = splits.length;
 
@@ -1311,27 +1334,49 @@ public class JobInProgress {
                                             int clusterSize, 
                                             int numUniqueHosts
                                            ) throws IOException {
-    if (status.getRunState() != JobStatus.RUNNING) {
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 
+        anyCacheLevel);
+  }
+
+  /**
+   * Return a MapTask with locality level that smaller or equal than a given
+   * locality level to tasktracker.
+   * 
+   * @param tts The task tracker that is asking for a task
+   * @param clusterSize The number of task trackers in the cluster
+   * @param numUniqueHosts The number of hosts that run task trackers
+   * @param avgProgress The average progress of this kind of task in this job
+   * @param maxCacheLevel The maximum topology level until which to schedule
+   *                      maps.
+   * @return the index in tasks of the selected task (or -1 for no task)
+   * @throws IOException
+   */
+  public synchronized Task obtainNewMapTaskCommon(
+      TaskTrackerStatus tts, int clusterSize, int numUniqueHosts, 
+      int maxCacheLevel) throws IOException {
+    if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       try { throw new IOException("state = " + status.getRunState()); }
       catch (IOException ioe) {ioe.printStackTrace();}
       return null;
     }
-        
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel,
+
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel, 
                                 status.mapProgress());
     if (target == -1) {
       return null;
     }
-    
+
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      resetSchedulingOpportunities();
+      // DO NOT reset for off-switch!
+      if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) {
+        resetSchedulingOpportunities();
+      }
     }
-
     return result;
-  }    
+  }
 
   /*
    * Return task cleanup attempt if any, to run on a given tracker
@@ -1374,78 +1419,22 @@ public class JobInProgress {
   public synchronized Task obtainNewNodeLocalMapTask(TaskTrackerStatus tts,
                                                      int clusterSize,
                                                      int numUniqueHosts)
-  throws IOException {
-    if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      try { throw new IOException("state = " + status.getRunState()); }
-      catch (IOException ioe) {ioe.printStackTrace();}
-      return null;
-    }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 1, 
-                                status.mapProgress());
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      resetSchedulingOpportunities();
-    }
-
-    return result;
+      throws IOException {
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 1);
   }
   
   public synchronized Task obtainNewNodeOrRackLocalMapTask(
       TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
   throws IOException {
-    if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      try { throw new IOException("state = " + status.getRunState()); }
-      catch (IOException ioe) {ioe.printStackTrace();}
-      return null;
-    }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
-                                status.mapProgress());
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      resetSchedulingOpportunities();
-    }
-
-    return result;
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel);
   }
   
   public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
                                                     int clusterSize, 
                                                     int numUniqueHosts)
-  throws IOException {
-    if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      try { throw new IOException("state = " + status.getRunState()); }
-      catch (IOException ioe) {ioe.printStackTrace();}
-      return null;
-    }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
-                                NON_LOCAL_CACHE_LEVEL, status.mapProgress());
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      // DO NOT reset for off-switch!
-    }
-
-    return result;
+      throws IOException {
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 
+        NON_LOCAL_CACHE_LEVEL);
   }
   
   public void schedulingOpportunity() {
@@ -1620,7 +1609,7 @@ public class JobInProgress {
   }
   
   public synchronized boolean scheduleReduces() {
-    return finishedMapTasks >= completedMapsForReduceSlowstart;
+    return finishedMapTasks + failedMapTIPs >= completedMapsForReduceSlowstart;
   }
   
   /**
@@ -1693,16 +1682,37 @@ public class JobInProgress {
   
   // returns the (cache)level at which the nodes matches
   private int getMatchingLevelForNodes(Node n1, Node n2) {
+    return getMatchingLevelForNodes(n1, n2, this.maxLevel);
+  }
+
+  static int getMatchingLevelForNodes(Node n1, Node n2, int maxLevel) {
     int count = 0;
+
+    // In the case that the two nodes are at different levels in the
+    // node heirarchy, walk upwards on the deeper one until the
+    // levels are equal. Each of these counts as "distance" since it
+    // assumedly is going through another rack.
+    int level1 = n1.getLevel(), level2 = n2.getLevel();
+    while (n1 != null && level1 > level2) {
+      n1 = n1.getParent();
+      level1--;
+      count++;
+    }
+    while (n2 != null && level2 > level1) {
+      n2 = n2.getParent();
+      level2--;
+      count++;
+    }
+
     do {
-      if (n1.equals(n2)) {
-        return count;
+      if (n1.equals(n2) || count >= maxLevel) {
+        return Math.min(count, maxLevel);
       }
       ++count;
       n1 = n1.getParent();
       n2 = n2.getParent();
     } while (n1 != null);
-    return this.maxLevel;
+    return maxLevel;
   }
 
   /**
@@ -1729,7 +1739,7 @@ public class JobInProgress {
     // keeping the earlier ordering intact
     String name;
     String splits = "";
-    Enum counter = null;
+    Enum<Counter> counter = null;
     if (tip.isJobSetupTask()) {
       launchedSetup = true;
       name = Values.SETUP.name();
@@ -1776,6 +1786,7 @@ public class JobInProgress {
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
+    Locality locality = Locality.OFF_SWITCH;
     if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
@@ -1795,26 +1806,71 @@ public class JobInProgress {
           }
         }
       }
-      switch (level) {
-      case 0 :
-        LOG.info("Choosing data-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
-        break;
+      locality = logAndIncreJobCounters(tip, level, jobtracker.isNodeGroupAware());
+    }
+    // Set locality
+    tip.setTaskAttemptLocality(id, locality);
+
+    // Set avataar
+    Avataar avataar = (tip.getActiveTasks().size() > 1) ? Avataar.SPECULATIVE :
+        Avataar.VIRGIN;
+    tip.setTaskAttemptAvataar(id, avataar);
+  }
+
+  private Locality logAndIncreJobCounters(TaskInProgress tip, int level, 
+      boolean isNodeGroupAware) {
+    switch (level) {
+      case 0:
+        // level 0 means data-local
+        logAndIncrDataLocalMaps(tip);
+        return Locality.NODE_LOCAL;
       case 1:
-        LOG.info("Choosing rack-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
-        break;
-      default :
+        if (isNodeGroupAware) {
+          // level 1 in case of with-NodeGroup means nodegroup-local
+          logAndIncrNodeGroupLocalMaps(tip);
+          return Locality.GROUP_LOCAL;
+        } else {
+          // level 1 in case of without-NodeGroup means rack-local
+          logAndIncrRackLocalMaps(tip);
+          return Locality.RACK_LOCAL;
+        }
+      case 2:
+        if (isNodeGroupAware) {
+          // level 2 in case of with-NodeGroup means rack-local
+          logAndIncrRackLocalMaps(tip);
+          return Locality.RACK_LOCAL;
+        }
+        // in case of without-NodeGroup, level 2 falls through to other-local
+        // handled by default
+      default:
         // check if there is any locality
         if (level != this.maxLevel) {
-          LOG.info("Choosing cached task at level " + level + tip.getTIPId());
-          jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+          logAndIncrOtherLocalMaps(tip, level);
         }
-        break;
-      }
+        return Locality.OFF_SWITCH;
     }
   }
 
+  private void logAndIncrOtherLocalMaps(TaskInProgress tip, int level) {
+    LOG.info("Choosing cached task at level " + level + tip.getTIPId());
+    jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+  }
+
+  private void logAndIncrNodeGroupLocalMaps(TaskInProgress tip) {
+    LOG.info("Choosing nodeGroup-local task " + tip.getTIPId());
+    jobCounters.incrCounter(Counter.NODEGROUP_LOCAL_MAPS, 1);
+  }
+
+  private void logAndIncrRackLocalMaps(TaskInProgress tip) {
+    LOG.info("Choosing rack-local task " + tip.getTIPId());
+    jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+  }
+
+  private void logAndIncrDataLocalMaps(TaskInProgress tip) {
+    LOG.info("Choosing data-local task " + tip.getTIPId());
+    jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+  }
+
   void setFirstTaskLaunchTime(TaskInProgress tip) {
     TaskType key = tip.getFirstTaskType();
 
@@ -2194,21 +2250,19 @@ public class JobInProgress {
         continue;
       }
 
-      if (!tip.hasRunOnMachine(ttStatus.getHost(), 
+      if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
+        // Check if this tip can be removed from the list.
+        // If the list is shared then we should not remove.
+        if(shouldRemove){
+          iter.remove();
+        }
+        if (!tip.hasRunOnMachine(ttStatus.getHost(),
                                ttStatus.getTrackerName())) {
-        if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
-          // In case of shared list we don't remove it. Since the TIP failed 
-          // on this tracker can be scheduled on some other tracker.
-          if (shouldRemove) {
-            iter.remove(); //this tracker is never going to run it again
-          }
           return tip;
-        } 
+        }
       } else {
-        // Check if this tip can be removed from the list.
-        // If the list is shared then we should not remove.
-        if (shouldRemove) {
-          // This tracker will never speculate this tip
+        if (shouldRemove && tip.hasRunOnMachine(ttStatus.getHost(),
+                                         ttStatus.getTrackerName())) {
           iter.remove();
         }
       }
@@ -2572,22 +2626,27 @@ public class JobInProgress {
       this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
     String taskType = getTaskType(tip);
+    TaskAttemptID taskAttemptId = status.getTaskID();
+    Locality locality = checkLocality(tip, taskAttemptId);
+    Avataar avataar = checkAvataar(tip, taskAttemptId);
     if (status.getIsMap()){
-      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
+      JobHistory.MapAttempt.logStarted(taskAttemptId, status.getStartTime(), 
                                        status.getTaskTracker(), 
-                                       ttStatus.getHttpPort(), 
-                                       taskType); 
-      JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
+                                       ttStatus.getHttpPort(),
+                                       taskType,locality, avataar);
+      JobHistory.MapAttempt.logFinished(taskAttemptId, status.getFinishTime(), 
                                         trackerHostname, taskType,
                                         status.getStateString(), 
                                         status.getCounters()); 
     }else{
-      JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
+      JobHistory.ReduceAttempt.logStarted(taskAttemptId, status.getStartTime(), 
                                           status.getTaskTracker(),
                                           ttStatus.getHttpPort(), 
-                                          taskType); 
-      JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
-                                           status.getSortFinishTime(), status.getFinishTime(), 
+                                          taskType, locality, avataar);
+      JobHistory.ReduceAttempt.logFinished(taskAttemptId,
+                                           status.getShuffleFinishTime(),
+                                           status.getSortFinishTime(),
+                                           status.getFinishTime(),
                                            trackerHostname, 
                                            taskType,
                                            status.getStateString(), 
@@ -3032,9 +3091,12 @@ public class JobInProgress {
     String diagInfo = taskDiagnosticInfo == null ? "" :
       StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
     String taskType = getTaskType(tip);
+    TaskAttemptID taskAttemptId = status.getTaskID();
+    Locality locality = checkLocality(tip, taskAttemptId);
+    Avataar avataar = checkAvataar(tip, taskAttemptId);
     if (taskStatus.getIsMap()) {
       JobHistory.MapAttempt.logStarted(taskid, startTime, 
-        taskTrackerName, taskTrackerPort, taskType);
+        taskTrackerName, taskTrackerPort, taskType, locality, avataar);
       if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.MapAttempt.logFailed(taskid, finishTime,
           taskTrackerHostName, diagInfo, taskType);
@@ -3044,7 +3106,7 @@ public class JobInProgress {
       }
     } else {
       JobHistory.ReduceAttempt.logStarted(taskid, startTime, 
-        taskTrackerName, taskTrackerPort, taskType);
+        taskTrackerName, taskTrackerPort, taskType, locality, avataar);
       if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
           taskTrackerHostName, diagInfo, taskType);
@@ -3140,6 +3202,22 @@ public class JobInProgress {
     }
   }
 
+  private Locality checkLocality(TaskInProgress tip, TaskAttemptID taskAttemptId) {
+    Locality locality = tip.getTaskAttemptLocality(taskAttemptId);
+    if (locality == null) {
+      locality = Locality.OFF_SWITCH;
+    }
+    return locality;
+  }
+
+  private Avataar checkAvataar(TaskInProgress tip, TaskAttemptID taskAttemptId) {
+    Avataar avataar = tip.getTaskAttemptAvataar(taskAttemptId);
+    if (avataar == null) {
+      avataar = Avataar.VIRGIN;
+    }
+    return avataar;
+  }
+
   void killSetupTip(boolean isMap) {
     if (isMap) {
       setup[0].kill();
@@ -3221,6 +3299,16 @@ public class JobInProgress {
       jobtracker.storeCompletedJob(this);
       jobtracker.finalizeJob(this);
 
+    }
+    cleanupJob();
+  }
+  
+  /**
+   * The job is dead. We're now cleaning it, getting rid of job directories and
+   * removing all delegation token etc.
+   */
+  void cleanupJob() {
+    synchronized (this) {
       try {
         // Definitely remove the local-disk copy of the job file
         if (localJobFile != null) {
@@ -3230,7 +3318,17 @@ public class JobInProgress {
 
         Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
         CleanupQueue.getInstance().addToQueue(
-            new PathDeletionContext(tempDir, conf)); 
+            new PathDeletionContext(tempDir, conf));
+
+        // delete the staging area for the job and cancel delegation token
+        String jobTempDir = conf.get("mapreduce.job.dir");
+        if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
+            !conf.getKeepFailedTaskFiles()) {
+          Path jobTempDirPath = new Path(jobTempDir);
+          CleanupQueue.getInstance().addToQueue(
+              new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
+        }
+
       } catch (IOException e) {
         LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
       }
@@ -3244,11 +3342,6 @@ public class JobInProgress {
       this.runningReduces = null;
     }
     
-    // remove jobs delegation tokens
-    if(conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
-      DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
-    } // else don't remove it.May be used by spawned tasks
-
     //close the user's FS
     try {
       fs.close();
@@ -3516,31 +3609,6 @@ public class JobInProgress {
   }
 
   /**
-   * generate job token and save it into the file
-   * @throws IOException
-   */
-  private void generateAndStoreTokens() throws IOException {
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
-    if (tokenStorage == null) {
-      tokenStorage = new Credentials();
-    }
-    //create JobToken file and write token to it
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
-        .toString()));
-    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
-        jobtracker.getJobTokenSecretManager());
-    token.setService(identifier.getJobId());
-    
-    TokenCache.setJobToken(token, tokenStorage);
-        
-    // write TokenStorage out
-    tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
-    LOG.info("jobToken generated and stored with users keys in "
-        + keysFile.toUri().getPath());
-  }
-
-  /**
    * Get the level of locality that a given task would have if launched on
    * a particular TaskTracker. Returns 0 if the task has data on that machine,
    * 1 if it has data on the same rack, etc (depending on number of levels in

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 # ResourceBundle properties file for job-level counters
 
 CounterGroupName=                  Job Counters 
@@ -9,6 +22,7 @@ TOTAL_LAUNCHED_REDUCES.name=       Launc
 OTHER_LOCAL_MAPS.name=             Other local map tasks
 DATA_LOCAL_MAPS.name=              Data-local map tasks
 RACK_LOCAL_MAPS.name=              Rack-local map tasks
+NODEGROUP_LOCAL_MAPS.name=         NodeGroup-local map tasks
 FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
 FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)
 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java Fri Jun 21 06:37:27 2013
@@ -82,7 +82,7 @@ public class JobLocalizer {
   private final FileSystem lfs;
   private final List<Path> localDirs;
   private final LocalDirAllocator lDirAlloc;
-  private final JobConf ttConf;
+  protected final JobConf ttConf;
 
   private final String JOBDIR;
   private final String DISTDIR;
@@ -90,7 +90,7 @@ public class JobLocalizer {
   private final String JARDST;
   private final String JOBCONF;
   private final String JOBTOKEN;
-  private static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
+  protected static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
 
   public JobLocalizer(JobConf ttConf, String user, String jobid)
       throws IOException {
@@ -108,10 +108,10 @@ public class JobLocalizer {
       throw new IOException("Cannot initialize for null jobid");
     }
     this.jobid = jobid;
-    this.ttConf = ttConf;
-    lfs = FileSystem.getLocal(ttConf).getRaw();
+    this.ttConf = new JobConf(ttConf);
+    lfs = FileSystem.getLocal(this.ttConf).getRaw();
     this.localDirs = createPaths(user, localDirs);
-    ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
+    this.ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
     Collections.shuffle(this.localDirs);
     lDirAlloc = new LocalDirAllocator(JOB_LOCAL_CTXT);
     JOBDIR = TaskTracker.JOBCACHE + Path.SEPARATOR + jobid;

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri Jun 21 06:37:27 2013
@@ -80,6 +80,12 @@ class JobQueueTaskScheduler extends Task
   @Override
   public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
+    // Check for JT safe-mode
+    if (taskTrackerManager.isInSafeMode()) {
+      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+      return null;
+    } 
+
     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     final int numTaskTrackers = clusterStatus.getTaskTrackers();
@@ -166,7 +172,8 @@ class JobQueueTaskScheduler extends Task
 
           Task t = null;
           
-          // Try to schedule a node-local or rack-local Map task
+          // Try to schedule a Map task with locality between node-local 
+          // and rack-local
           t = 
             job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                 numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());