You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC

svn commit: r830230 [6/9] - in /hadoop/mapreduce/branches/HDFS-641: ./ .eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-sche...

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 27 15:43:58 2009
@@ -20,7 +20,6 @@
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
@@ -131,18 +130,27 @@
   // The maximum number of blacklists for a tracker after which the 
   // tracker could be blacklisted across all jobs
   private int MAX_BLACKLISTS_PER_TRACKER = 4;
+  
   // Approximate number of heartbeats that could arrive JobTracker
   // in a second
-  private int NUM_HEARTBEATS_IN_SECOND = 100;
+  private int NUM_HEARTBEATS_IN_SECOND;
+  private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
+  private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
+  
+  // Scaling factor for heartbeats, used for testing only
+  private float HEARTBEATS_SCALING_FACTOR;
+  private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
+  private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+  
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private DNSToSwitchMapping dnsToSwitchMapping;
-  private NetworkTopology clusterMap = new NetworkTopology();
+  NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
-  private final TaskScheduler taskScheduler;
+  final TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
@@ -158,7 +166,7 @@
   
   static final Clock DEFAULT_CLOCK = new Clock();
 
-  private JobHistory jobHistory = null;
+  private final JobHistory jobHistory;
 
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -416,10 +424,11 @@
                 // tracker is lost, and if it is blacklisted, remove 
                 // it from the count of blacklisted trackers in the cluster
                 if (isBlacklisted(trackerName)) {
-                  faultyTrackers.numBlacklistedTrackers -= 1;
+                  faultyTrackers.decrBlackListedTrackers(1);
                 }
                 updateTaskTrackerStatus(trackerName, null);
                 statistics.taskTrackerRemoved(trackerName);
+                getInstrumentation().decTrackers(1);
                 // remove the mapping from the hosts list
                 String hostname = newProfile.getHost();
                 hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -641,7 +650,16 @@
       }        
     }
 
-    
+    private void incrBlackListedTrackers(int count) {
+      numBlacklistedTrackers += count;
+      getInstrumentation().addBlackListedTrackers(count);
+    }
+
+    private void decrBlackListedTrackers(int count) {
+      numBlacklistedTrackers -= count;
+      getInstrumentation().decBlackListedTrackers(count);
+    }
+
     private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
       FaultInfo fi = getFaultInfo(hostName, true);
       boolean blackListed = fi.isBlacklisted();
@@ -800,7 +818,7 @@
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+        incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
       }
     }
     
@@ -820,7 +838,7 @@
         }
         uniqueHostsMap.put(hostName,
                            numTrackersOnHost);
-        numBlacklistedTrackers -= numTrackersOnHost;
+        decrBlackListedTrackers(numTrackersOnHost);
       }
     }
 
@@ -1126,12 +1144,12 @@
   ////////////////////////////////////////////////////////////////
   int port;
   String localMachine;
-  private String trackerIdentifier;
+  private final String trackerIdentifier;
   long startTime;
   int totalSubmissions = 0;
   private int totalMapTaskCapacity;
   private int totalReduceTaskCapacity;
-  private HostsFileReader hostsReader;
+  private final HostsFileReader hostsReader;
   
   // JobTracker recovery variables
   private volatile boolean hasRecovered = false;
@@ -1206,6 +1224,10 @@
   //
   int totalMaps = 0;
   int totalReduces = 0;
+  private int occupiedMapSlots = 0;
+  private int occupiedReduceSlots = 0;
+  private int reservedMapSlots = 0;
+  private int reservedReduceSlots = 0;
   private HashMap<String, TaskTracker> taskTrackers =
     new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -1217,9 +1239,9 @@
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
                                                 "expireLaunchingTasks");
 
-  CompletedJobStatusStore completedJobStatusStore = null;
+  final CompletedJobStatusStore completedJobStatusStore;
   Thread completedJobsStoreThread = null;
-  RecoveryManager recoveryManager;
+  final RecoveryManager recoveryManager;
 
   /**
    * It might seem like a bug to maintain a TreeSet of tasktracker objects,
@@ -1256,7 +1278,7 @@
   static final String SUBDIR = "jobTracker";
   FileSystem fs = null;
   Path systemDir = null;
-  private JobConf conf;
+  JobConf conf;
   private final UserGroupInformation mrOwner;
   private final String supergroup;
 
@@ -1265,7 +1287,7 @@
   long memSizeForMapSlotOnJT;
   long memSizeForReduceSlotOnJT;
 
-  private QueueManager queueManager;
+  private final QueueManager queueManager;
 
   JobTracker(JobConf conf) 
   throws IOException,InterruptedException, LoginException {
@@ -1294,8 +1316,21 @@
     tasktrackerExpiryInterval = 
       conf.getLong(JT_TRACKER_EXPIRY_INTERVAL, 10 * 60 * 1000);
     retiredJobsCacheSize = conf.getInt(JT_RETIREJOB_CACHE_SIZE, 1000);
-    MAX_BLACKLISTS_PER_TRACKER = conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
-    NUM_HEARTBEATS_IN_SECOND = conf.getInt(JT_HEARTBEATS_IN_SECOND, 100);
+    MAX_BLACKLISTS_PER_TRACKER = 
+      conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
+    
+    NUM_HEARTBEATS_IN_SECOND = 
+      conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
+    if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
+      NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND;
+    }
+    
+    HEARTBEATS_SCALING_FACTOR = 
+      conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR, 
+                    DEFAULT_HEARTBEATS_SCALING_FACTOR);
+    if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) {
+      HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
+    }
 
     //This configuration is there solely for tuning purposes and 
     //once this feature has been tested in real clusters and an appropriate
@@ -1790,7 +1825,7 @@
    * 
    * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
    */
-  private void removeMarkedTasks(String taskTracker) {
+  void removeMarkedTasks(String taskTracker) {
     // Purge all the 'marked' tasks which were running at taskTracker
     Set<TaskAttemptID> markedTaskSet = 
       trackerToMarkedTasksMap.get(taskTracker);
@@ -2086,7 +2121,7 @@
    * 
    * @param status Task Tracker's status
    */
-  private void addNewTracker(TaskTracker taskTracker) {
+  void addNewTracker(TaskTracker taskTracker) {
     TaskTrackerStatus status = taskTracker.getStatus();
     trackerExpiryQueue.add(status);
 
@@ -2104,6 +2139,7 @@
       hostnameToTaskTracker.put(hostname, trackers);
     }
     statistics.taskTrackerAdded(status.getTrackerName());
+    getInstrumentation().addTrackers(1);
     LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
              + hostname);
     trackers.add(taskTracker);
@@ -2183,7 +2219,7 @@
   
   // Update the listeners about the job
   // Assuming JobTracker is locked on entry.
-  private void updateJobInProgressListeners(JobChangeEvent event) {
+  void updateJobInProgressListeners(JobChangeEvent event) {
     for (JobInProgressListener listener : jobInProgressListeners) {
       listener.jobUpdated(event);
     }
@@ -2347,15 +2383,16 @@
   
   /**
    * Calculates next heartbeat interval using cluster size.
-   * Heartbeat interval is incremented 1second for every 50 nodes. 
+   * Heartbeat interval is incremented by 1 second for every 100 nodes by default. 
    * @return next heartbeat interval.
    */
   public int getNextHeartbeatInterval() {
     // get the no of task trackers
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
-                                (int)(1000 * Math.ceil((double)clusterSize / 
-                                                       NUM_HEARTBEATS_IN_SECOND)),
+                                (int)(1000 * HEARTBEATS_SCALING_FACTOR *
+                                      Math.ceil((double)clusterSize / 
+                                                NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
   }
@@ -2393,13 +2430,17 @@
    * @param status The new status for the task tracker
    * @return Was an old status found?
    */
-  private boolean updateTaskTrackerStatus(String trackerName,
+  boolean updateTaskTrackerStatus(String trackerName,
                                           TaskTrackerStatus status) {
     TaskTracker tt = getTaskTracker(trackerName);
     TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+      occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+      getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
+      getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
         int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
@@ -2422,6 +2463,10 @@
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      occupiedMapSlots += status.countOccupiedMapSlots();
+      occupiedReduceSlots += status.countOccupiedReduceSlots();
+      getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
+      getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
         int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
@@ -2489,6 +2534,25 @@
     return oldStatus != null;
   }
   
+  // Increment the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void incrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots += reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots += reservedSlots;
+    }
+  }
+
+  // Decrement the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void decrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots -= reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots -= reservedSlots;
+    }
+  }
   
   private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
     TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
@@ -2501,7 +2565,7 @@
   /**
    * Process incoming heartbeat messages from the task trackers.
    */
-  private synchronized boolean processHeartbeat(
+  synchronized boolean processHeartbeat(
                                  TaskTrackerStatus trackerStatus, 
                                  boolean initialContact) {
     String trackerName = trackerStatus.getTrackerName();
@@ -2530,7 +2594,7 @@
           // if this is lost tracker that came back now, and if it blacklisted
           // increment the count of blacklisted trackers in the cluster
           if (isBlacklisted(trackerName)) {
-            faultyTrackers.numBlacklistedTrackers += 1;
+            faultyTrackers.incrBlackListedTrackers(1);
           }
           addNewTracker(taskTracker);
         }
@@ -2547,8 +2611,7 @@
    * A tracker wants to know if any of its Tasks have been
    * closed (because the job completed, whether successfully or not)
    */
-  private synchronized List<TaskTrackerAction> getTasksToKill(
-                                                              String taskTracker) {
+  synchronized List<TaskTrackerAction> getTasksToKill(String taskTracker) {
     
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
     List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
@@ -2625,7 +2688,7 @@
   /**
    * A tracker wants to know if any of its Tasks can be committed 
    */
-  private synchronized List<TaskTrackerAction> getTasksToSave(
+  synchronized List<TaskTrackerAction> getTasksToSave(
                                                  TaskTrackerStatus tts) {
     List<TaskStatus> taskStatuses = tts.getTaskReports();
     if (taskStatuses != null) {
@@ -2928,9 +2991,12 @@
   }
   
   public synchronized ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
-      totalReduceTaskCapacity, taskTrackers.size() - 
-      getBlacklistedTrackerCount(), 
+    return new ClusterMetrics(totalMaps,
+      totalReduces, occupiedMapSlots, occupiedReduceSlots,
+      reservedMapSlots, reservedReduceSlots,
+      totalMapTaskCapacity, totalReduceTaskCapacity,
+      totalSubmissions,
+      taskTrackers.size() - getBlacklistedTrackerCount(), 
       getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
   }
 
@@ -3744,12 +3810,13 @@
   }
 
   // main decommission
-  private synchronized void decommissionNodes(Set<String> hosts) 
+  synchronized void decommissionNodes(Set<String> hosts) 
   throws IOException {  
     LOG.info("Decommissioning " + hosts.size() + " nodes");
     // create a list of tracker hostnames
     synchronized (taskTrackers) {
       synchronized (trackerExpiryQueue) {
+        int trackersDecommissioned = 0;
         for (String host : hosts) {
           LOG.info("Decommissioning host " + host);
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
@@ -3758,11 +3825,14 @@
               LOG.info("Decommission: Losing tracker " + tracker + 
                        " on host " + host);
               lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+              updateTaskTrackerStatus(
+                tracker.getStatus().getTrackerName(), null);
             }
+            trackersDecommissioned += trackers.size();
           }
           LOG.info("Host " + host + " is ready for decommissioning");
         }
+        getInstrumentation().setDecommissionedTrackers(trackersDecommissioned);
       }
     }
   }
@@ -4090,7 +4160,108 @@
   void incrementFaults(String hostName) {
     faultyTrackers.incrementFaults(hostName);
   }
-  
+
+  JobTracker(JobConf conf, Clock clock, boolean ignoredForSimulation) 
+  throws IOException {
+    this.clock = clock;
+    this.conf = conf;
+    trackerIdentifier = getDateFormat().format(new Date());
+
+    if (fs == null) {
+      fs = FileSystem.get(conf);
+    }
+    
+    tasktrackerExpiryInterval = 
+      conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
+    retiredJobsCacheSize = 
+      conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
+
+    // min time before retire
+    MAX_BLACKLISTS_PER_TRACKER = 
+        conf.getInt("mapred.max.tracker.blacklists", 4);
+    NUM_HEARTBEATS_IN_SECOND = 
+        conf.getInt("mapred.heartbeats.in.second", 100);
+    
+    try {
+      mrOwner = UnixUserGroupInformation.login(conf);
+    } catch (LoginException e) {
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+    supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+    
+    this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
+        conf.get("mapred.hosts.exclude", ""));
+    // queue manager
+    Configuration queuesConf = new Configuration(this.conf);
+    queueManager = new QueueManager(queuesConf);
+
+    // Create the scheduler
+    Class<? extends TaskScheduler> schedulerClass
+      = conf.getClass("mapred.jobtracker.taskScheduler",
+          JobQueueTaskScheduler.class, TaskScheduler.class);
+    taskScheduler = 
+      (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf);
+    
+    // Set ports, start RPC servers, setup security policy etc.
+    InetSocketAddress addr = getAddress(conf);
+    this.localMachine = addr.getHostName();
+    this.port = addr.getPort();
+
+    // Create the jetty server
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
+        conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
+    String infoBindAddress = infoSocAddr.getHostName();
+    int tmpInfoPort = infoSocAddr.getPort();
+    this.startTime = clock.getTime();
+    infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
+        tmpInfoPort == 0, conf);
+    infoServer.setAttribute("job.tracker", this);
+    
+    // initialize history parameters.
+    String historyLogDir = null;
+    FileSystem historyFS = null;
+
+    jobHistory = new JobHistory();
+    jobHistory.init(this, conf, this.localMachine, this.startTime);
+    jobHistory.initDone(conf, fs);
+    historyLogDir = jobHistory.getCompletedJobHistoryLocation().toString();
+    infoServer.setAttribute("historyLogDir", historyLogDir);
+    historyFS = new Path(historyLogDir).getFileSystem(conf);
+
+    infoServer.setAttribute("fileSys", historyFS);
+    infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
+    infoServer.start();
+    this.infoPort = this.infoServer.getPort();
+
+    // Initialize instrumentation
+    JobTrackerInstrumentation tmp;
+    Class<? extends JobTrackerInstrumentation> metricsInst =
+      getInstrumentationClass(conf);
+    try {
+      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+      tmp = c.newInstance(this, conf);
+    } catch(Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by 
+      //falling back on the default.
+      LOG.error("failed to initialize job tracker metrics", e);
+      tmp = new JobTrackerMetricsInst(this, conf);
+    }
+    myInstrumentation = tmp;
+    
+    // start the recovery manager
+    recoveryManager = new RecoveryManager();
+    
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+    this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
+        NetworkTopology.DEFAULT_HOST_LEVEL);
+
+    //initializes the job status store
+    completedJobStatusStore = new CompletedJobStatusStore(conf);
+  }
+
   /**
    * Get the path of the locally stored job file
    * @param jobId id of the job

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Tue Oct 27 15:43:58 2009
@@ -84,4 +84,80 @@
 
   public void decBlackListedReduceSlots(int slots)
   { }
+
+  public void addReservedMapSlots(int slots)
+  { }
+
+  public void decReservedMapSlots(int slots)
+  { }
+
+  public void addReservedReduceSlots(int slots)
+  { }
+
+  public void decReservedReduceSlots(int slots)
+  { }
+
+  public void addOccupiedMapSlots(int slots)
+  { }
+
+  public void decOccupiedMapSlots(int slots)
+  { }
+
+  public void addOccupiedReduceSlots(int slots)
+  { }
+
+  public void decOccupiedReduceSlots(int slots)
+  { }
+
+  public void failedJob(JobConf conf, JobID id) 
+  { }
+
+  public void killedJob(JobConf conf, JobID id) 
+  { }
+
+  public void addPrepJob(JobConf conf, JobID id) 
+  { }
+  
+  public void decPrepJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void decRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningMaps(JobID id, int task)
+  { }
+
+  public void decRunningMaps(JobID id, int task) 
+  { }
+
+  public void addRunningReduces(JobID id, int task)
+  { }
+
+  public void decRunningReduces(JobID id, int task)
+  { }
+
+  public void killedMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void killedReduce(TaskAttemptID taskAttemptID)
+  { }
+
+  public void addTrackers(int trackers)
+  { }
+
+  public void decTrackers(int trackers)
+  { }
+
+  public void addBlackListedTrackers(int trackers)
+  { }
+
+  public void decBlackListedTrackers(int trackers)
+  { }
+
+  public void setDecommissionedTrackers(int trackers)
+  { }  
+
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Tue Oct 27 15:43:58 2009
@@ -22,8 +22,6 @@
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 
 class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
   private final MetricsRecord metricsRecord;
@@ -45,6 +43,27 @@
   private int numBlackListedMapSlots = 0;
   private int numBlackListedReduceSlots = 0;
 
+  private int numReservedMapSlots = 0;
+  private int numReservedReduceSlots = 0;
+  private int numOccupiedMapSlots = 0;
+  private int numOccupiedReduceSlots = 0;
+  
+  private int numJobsFailed = 0;
+  private int numJobsKilled = 0;
+  
+  private int numJobsPreparing = 0;
+  private int numJobsRunning = 0;
+  
+  private int numRunningMaps = 0;
+  private int numRunningReduces = 0;
+  
+  private int numMapTasksKilled = 0;
+  private int numReduceTasksKilled = 0;
+
+  private int numTrackers = 0;
+  private int numTrackersBlackListed = 0;
+  private int numTrackersDecommissioned = 0;
+  
   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
     super(tracker, conf);
     String sessionId = conf.getSessionId();
@@ -78,6 +97,28 @@
       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
       metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
       metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+      
+      metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
+      metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
+      metricsRecord.incrMetric("occupied_map_slots", numOccupiedMapSlots);
+      metricsRecord.incrMetric("occupied_reduce_slots", numOccupiedReduceSlots);
+      
+      metricsRecord.incrMetric("jobs_failed", numJobsFailed);
+      metricsRecord.incrMetric("jobs_killed", numJobsKilled);
+      
+      metricsRecord.incrMetric("jobs_preparing", numJobsPreparing);
+      metricsRecord.incrMetric("jobs_running", numJobsRunning);
+      
+      metricsRecord.incrMetric("running_maps", numRunningMaps);
+      metricsRecord.incrMetric("running_reduces", numRunningReduces);
+      
+      metricsRecord.incrMetric("maps_killed", numMapTasksKilled);
+      metricsRecord.incrMetric("reduces_killed", numReduceTasksKilled);
+
+      metricsRecord.incrMetric("trackers", numTrackers);
+      metricsRecord.incrMetric("trackers_blacklisted", numTrackersBlackListed);
+      metricsRecord.setMetric("trackers_decommissioned", 
+          numTrackersDecommissioned);
 
       numMapTasksLaunched = 0;
       numMapTasksCompleted = 0;
@@ -91,6 +132,26 @@
       numWaitingReduces = 0;
       numBlackListedMapSlots = 0;
       numBlackListedReduceSlots = 0;
+      
+      numReservedMapSlots = 0;
+      numReservedReduceSlots = 0;
+      numOccupiedMapSlots = 0;
+      numOccupiedReduceSlots = 0;
+      
+      numJobsFailed = 0;
+      numJobsKilled = 0;
+      
+      numJobsPreparing = 0;
+      numJobsRunning = 0;
+      
+      numRunningMaps = 0;
+      numRunningReduces = 0;
+      
+      numMapTasksKilled = 0;
+      numReduceTasksKilled = 0;
+
+      numTrackers = 0;
+      numTrackersBlackListed = 0;
     }
     metricsRecord.update();
 
@@ -166,12 +227,12 @@
   }
 
   @Override
-  public void setMapSlots(int slots) {
+  public synchronized void setMapSlots(int slots) {
     numMapSlots = slots;
   }
 
   @Override
-  public void setReduceSlots(int slots) {
+  public synchronized void setReduceSlots(int slots) {
     numReduceSlots = slots;
   }
 
@@ -194,4 +255,154 @@
   public synchronized void decBlackListedReduceSlots(int slots){
     numBlackListedReduceSlots -= slots;
   }
+
+  @Override
+  public synchronized void addReservedMapSlots(int slots)
+  { 
+    numReservedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedMapSlots(int slots)
+  {
+    numReservedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void failedJob(JobConf conf, JobID id) 
+  {
+    numJobsFailed++;
+  }
+
+  @Override
+  public synchronized void killedJob(JobConf conf, JobID id) 
+  {
+    numJobsKilled++;
+  }
+
+  @Override
+  public synchronized void addPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing++;
+  }
+
+  @Override
+  public synchronized void decPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing--;
+  }
+
+  @Override
+  public synchronized void addRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning++;
+  }
+
+  @Override
+  public synchronized void decRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning--;
+  }
+
+  @Override
+  public synchronized void addRunningMaps(JobID id, int task)
+  {
+    numRunningMaps += task;
+  }
+
+  @Override
+  public synchronized void decRunningMaps(JobID id, int task) 
+  {
+    numRunningMaps -= task;
+  }
+
+  @Override
+  public synchronized void addRunningReduces(JobID id, int task)
+  {
+    numRunningReduces += task;
+  }
+
+  @Override
+  public synchronized void decRunningReduces(JobID id, int task)
+  {
+    numRunningReduces -= task;
+  }
+
+  @Override
+  public synchronized void killedMap(TaskAttemptID taskAttemptID)
+  {
+    numMapTasksKilled++;
+  }
+
+  @Override
+  public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+  {
+    numReduceTasksKilled++;
+  }
+
+  @Override
+  public synchronized void addTrackers(int trackers)
+  {
+    numTrackers += trackers;
+  }
+
+  @Override
+  public synchronized void decTrackers(int trackers)
+  {
+    numTrackers -= trackers;
+  }
+
+  @Override
+  public synchronized void addBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed += trackers;
+  }
+
+  @Override
+  public synchronized void decBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed -= trackers;
+  }
+
+  @Override
+  public synchronized void setDecommissionedTrackers(int trackers)
+  {
+    numTrackersDecommissioned = trackers;
+  }  
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Oct 27 15:43:58 2009
@@ -69,6 +69,7 @@
     return ClientProtocol.versionID;
   }
   
+  @SuppressWarnings("unchecked")
   static RawSplit[] getRawSplits(JobContext jContext, JobConf job)
       throws Exception {
     JobConf jobConf = jContext.getJobConf();
@@ -309,7 +310,7 @@
           }
         }
         // delete the temporary directory in output directory
-        outputCommitter.cleanupJob(jContext);
+        outputCommitter.commitJob(jContext);
         status.setCleanupProgress(1.0f);
 
         if (killed) {
@@ -322,7 +323,8 @@
 
       } catch (Throwable t) {
         try {
-          outputCommitter.cleanupJob(jContext);
+          outputCommitter.abortJob(jContext, 
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         } catch (IOException ioe) {
           LOG.info("Error cleaning up job:" + id);
         }
@@ -505,7 +507,8 @@
   }
   
   public ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(map_tasks, reduce_tasks, 1, 1, 1, 0, 0);
+    return new ClusterMetrics(map_tasks, reduce_tasks, map_tasks, reduce_tasks,
+      0, 0, 1, 1, jobs.size(), 1, 0, 0);
   }
 
   public State getJobTrackerState() throws IOException, InterruptedException {

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputCommitter.java Tue Oct 27 15:43:58 2009
@@ -71,10 +71,38 @@
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
+   * @deprecated Use {@link #commitJob(JobContext)} or 
+   *                 {@link #abortJob(JobContext, int)} instead.
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
 
   /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException 
+   */
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final runstate as {@link JobStatus#FAILED} or 
+   * {@link JobStatus#KILLED}
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @param status final runstate of the job
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, int status) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
    * Sets up output for the task.
    * 
    * @param taskContext Context of the task whose output is being written.
@@ -128,8 +156,12 @@
    * This method implements the new interface by calling the old method. Note
    * that the input types are different between the new and old apis and this
    * is a bridge between the two.
+   * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
+   *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
+   *             instead.
    */
   @Override
+  @Deprecated
   public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
                                ) throws IOException {
     cleanupJob((JobContext) context);
@@ -141,6 +173,33 @@
    * is a bridge between the two.
    */
   @Override
+  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
+                             ) throws IOException {
+    commitJob((JobContext) context);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
+		                   org.apache.hadoop.mapreduce.JobStatus.State runState) 
+  throws IOException {
+    int state = JobStatus.getOldNewJobRunState(runState);
+    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
+      throw new IOException ("Invalid job run state : " + runState.name());
+    }
+    abortJob((JobContext) context, state);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
   public final 
   void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
                  ) throws IOException {

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/OutputLogFilter.java Tue Oct 27 15:43:58 2009
@@ -27,9 +27,14 @@
  * This can be used to list paths of output directory as follows:
  *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  *                                   new OutputLogFilter()));
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputLogFilter} 
+ *   instead.
  */
 public class OutputLogFilter implements PathFilter {
+  private static final PathFilter LOG_FILTER = 
+    new Utils.OutputFileUtils.OutputLogFilter();
   public boolean accept(Path path) {
-    return !(path.toString().contains("_logs"));
+    return LOG_FILTER.accept(path);
   }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/Task.java Tue Oct 27 15:43:58 2009
@@ -42,9 +42,11 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
@@ -129,6 +131,7 @@
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
+  protected JobStatus.State jobRunStateForCleanup;
   protected boolean jobCleanup = false;
   protected boolean jobSetup = false;
   protected boolean taskCleanup = false;
@@ -322,6 +325,14 @@
     return jobCleanup;
   }
 
+  boolean isJobAbortTask() {
+    // the task is an abort task if its marked for cleanup and the final 
+    // expected state is either failed or killed.
+    return isJobCleanupTask() 
+           && (jobRunStateForCleanup == JobStatus.State.KILLED 
+               || jobRunStateForCleanup == JobStatus.State.FAILED);
+  }
+  
   boolean isJobSetupTask() {
     return jobSetup;
   }
@@ -334,6 +345,14 @@
     jobCleanup = true; 
   }
 
+  /**
+   * Sets the task to do job abort in the cleanup.
+   * @param status the final runstate of the job. 
+   */
+  void setJobCleanupTaskState(JobStatus.State status) {
+    jobRunStateForCleanup = status;
+  }
+  
   boolean isMapOrReduce() {
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
@@ -362,6 +381,9 @@
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(jobCleanup);
+    if (jobCleanup) {
+      WritableUtils.writeEnum(out, jobRunStateForCleanup);
+    }
     out.writeBoolean(jobSetup);
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(taskCleanup);
@@ -379,6 +401,10 @@
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     jobCleanup = in.readBoolean();
+    if (jobCleanup) {
+      jobRunStateForCleanup = 
+        WritableUtils.readEnum(in, JobStatus.State.class);
+    }
     jobSetup = in.readBoolean();
     writeSkipRecs = in.readBoolean();
     taskCleanup = in.readBoolean();
@@ -872,7 +898,27 @@
     getProgress().setStatus("cleanup");
     statusUpdate(umbilical);
     // do the cleanup
-    committer.cleanupJob(jobContext);
+    LOG.info("Cleaning up job");
+    if (jobRunStateForCleanup == JobStatus.State.FAILED 
+        || jobRunStateForCleanup == JobStatus.State.KILLED) {
+      LOG.info("Aborting job with runstate : " + jobRunStateForCleanup.name());
+      if (conf.getUseNewMapper()) {
+        committer.abortJob(jobContext, jobRunStateForCleanup);
+      } else {
+        org.apache.hadoop.mapred.OutputCommitter oldCommitter = 
+          (org.apache.hadoop.mapred.OutputCommitter)committer;
+        oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
+      }
+    } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
+      LOG.info("Committing job");
+      committer.commitJob(jobContext);
+    } else {
+      throw new IOException("Invalid state of the job for cleanup. State found "
+                            + jobRunStateForCleanup + " expecting "
+                            + JobStatus.State.SUCCEEDED + ", " 
+                            + JobStatus.State.FAILED + " or "
+                            + JobStatus.State.KILLED);
+    }
     done(umbilical, reporter);
   }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Oct 27 15:43:58 2009
@@ -113,7 +113,7 @@
   /**
    * Map from taskId -> TaskStatus
    */
-  private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
+  TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
   // Map from taskId -> TaskTracker Id, 
@@ -985,6 +985,8 @@
   public Task addRunningTask(TaskAttemptID taskid, 
                              String taskTracker,
                              boolean taskCleanup) {
+    // 1 slot is enough for taskCleanup task
+    int numSlotsNeeded = taskCleanup ? 1 : numSlotsRequired;
     // create the task
     Task t = null;
     if (isMapTask()) {
@@ -999,9 +1001,9 @@
         split = new BytesWritable();
       }
       t = new MapTask(jobFile, taskid, partition, splitClass, split,
-                      numSlotsRequired);
+                      numSlotsNeeded);
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsRequired);
+      t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
     }
     if (jobCleanup) {
       t.setJobCleanupTask();

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Oct 27 15:43:58 2009
@@ -59,13 +59,9 @@
   private static final File LOG_DIR = 
     new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
   
+  // localFS is set in (and used by) writeToIndexFile()
   static LocalFileSystem localFS = null;
   static {
-    try {
-      localFS = FileSystem.getLocal(new Configuration());
-    } catch (IOException ioe) {
-      LOG.warn("Getting local file system failed.");
-    }
     if (!LOG_DIR.exists()) {
       boolean b = LOG_DIR.mkdirs();
       if (!b) {
@@ -200,6 +196,10 @@
     File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
     Path indexFilePath = new Path(indexFile.getAbsolutePath());
     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+
+    if (localFS == null) {// set localFS once
+      localFS = FileSystem.getLocal(new Configuration());
+    }
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
   private static void resetPrevLengths(TaskAttemptID firstTaskid) {

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Oct 27 15:43:58 2009
@@ -51,6 +51,10 @@
   private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
   private List<TaskAttemptID> tasksToBeRemoved;
 
+  private static final String MEMORY_USAGE_STRING =
+    "Memory usage of ProcessTree %s for task-id %s : %d bytes, " +
+      "limit : %d bytes";
+  
   public TaskMemoryManagerThread(TaskTracker taskTracker) {
     this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
       taskTracker.getJobConf().getLong(
@@ -209,8 +213,8 @@
           // are processes more than 1 iteration old.
           long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
           long limit = ptInfo.getMemLimit();
-          LOG.info("Memory usage of ProcessTree " + pId + " :"
-              + currentMemUsage + "bytes. Limit : " + limit + "bytes");
+          LOG.info(String.format(MEMORY_USAGE_STRING, 
+                                pId, tid.toString(), currentMemUsage, limit));
 
           if (isProcessTreeOverLimit(tid.toString(), currentMemUsage, 
                                       curMemUsageOfAgedProcesses, limit)) {

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Oct 27 15:43:58 2009
@@ -143,6 +143,7 @@
    *             set via {@link JobConf#MAPRED_MAP_TASK_ENV} or
    *             {@link JobConf#MAPRED_REDUCE_TASK_ENV}
    */
+  @Deprecated
   public String getChildEnv(JobConf jobConf) {
     return jobConf.get(JobConf.MAPRED_TASK_ENV);
   }
@@ -209,7 +210,8 @@
           stderr);
 
       Map<String, String> env = new HashMap<String, String>();
-      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
+      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
+                                   taskid, logSize);
 
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
@@ -253,7 +255,12 @@
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tip.reportTaskFinished();
+
+      // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
+      // *false* since the task has either
+      // a) SUCCEEDED - which means commit has been done
+      // b) FAILED - which means we do not need to commit
+      tip.reportTaskFinished(false);
     }
   }
 
@@ -478,6 +485,7 @@
   }
 
   /**
+   * sets the environment variables needed for task jvm and its children.
    * @param errorInfo
    * @param workDir
    * @param env
@@ -485,7 +493,7 @@
    * @throws Throwable
    */
   private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
-      Map<String, String> env)
+      Map<String, String> env, TaskAttemptID taskid, long logSize)
       throws Throwable {
     StringBuffer ldLibraryPath = new StringBuffer();
     ldLibraryPath.append(workDir.toString());
@@ -497,6 +505,18 @@
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
     
+    // for the child of task jvm, set hadoop.root.logger
+    env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+    String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+    if (hadoopClientOpts == null) {
+      hadoopClientOpts = "";
+    } else {
+      hadoopClientOpts = hadoopClientOpts + " ";
+    }
+    hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+                       + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+    env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
     // add the env variables passed by the user
     String mapredChildEnv = getChildEnv(conf);
     if (mapredChildEnv != null && mapredChildEnv.length() > 0) {

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Oct 27 15:43:58 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
 /**************************************************
  * Describes the current status of a task.  This is
  * not intended to be a comprehensive piece of data.
@@ -132,11 +133,21 @@
   }
 
   /**
-   * Sets finishTime. 
+   * Sets finishTime for the task status if and only if the
+   * start time is set and passed finish time is greater than
+   * zero.
+   * 
    * @param finishTime finish time of task.
    */
   void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
+    if(this.getStartTime() > 0 && finishTime > 0) {
+      this.finishTime = finishTime;
+    } else {
+      //Using String utils to get the stack trace.
+      LOG.error("Trying to set finish time for task " + taskid + 
+          " when no start time is set, stackTrace is : " + 
+      		StringUtils.stringifyException(new Exception()));
+    }
   }
   /**
    * Get shuffle finish time for the task. If shuffle finish time was 
@@ -201,11 +212,20 @@
   }
 
   /**
-   * Set startTime of the task.
+   * Set startTime of the task if start time is greater than zero.
    * @param startTime start time
    */
   void setStartTime(long startTime) {
-    this.startTime = startTime;
+    //Making the assumption of passed startTime to be a positive
+    //long value explicit.
+    if (startTime > 0) {
+      this.startTime = startTime;
+    } else {
+      //Using String utils to get the stack trace.
+      LOG.error("Trying to set illegal startTime for task : " + taskid +
+          ".Stack trace is : " +
+          StringUtils.stringifyException(new Exception()));
+    }
   }
   /**
    * Get current phase of this task. Phase.Map in case of map tasks, 
@@ -326,11 +346,11 @@
 
     setDiagnosticInfo(status.getDiagnosticInfo());
     
-    if (status.getStartTime() != 0) {
-      this.startTime = status.getStartTime(); 
+    if (status.getStartTime() > 0) {
+      this.setStartTime(status.getStartTime()); 
     }
-    if (status.getFinishTime() != 0) {
-      this.finishTime = status.getFinishTime(); 
+    if (status.getFinishTime() > 0) {
+      this.setFinishTime(status.getFinishTime()); 
     }
     
     this.phase = status.getPhase();
@@ -359,8 +379,8 @@
     setProgress(progress);
     setStateString(state);
     setPhase(phase);
-    if (finishTime != 0) {
-      this.finishTime = finishTime; 
+    if (finishTime > 0) {
+      setFinishTime(finishTime); 
     }
   }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Oct 27 15:43:58 2009
@@ -225,6 +225,14 @@
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
+  
+  // Performance-related config knob to send an out-of-band heartbeat
+  // on task completion
+  private volatile boolean oobHeartbeatOnTaskCompletion;
+  
+  // Track number of completed tasks to send an out-of-band heartbeat
+  private IntWritable finishedCount = new IntWritable(0);
+  
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   CleanupQueue directoryCleanupThread;
@@ -636,6 +644,9 @@
     if (shouldStartHealthMonitor(this.fConf)) {
       startHealthMonitor(this.fConf);
     }
+    
+    oobHeartbeatOnTaskCompletion = 
+      fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
   }
 
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
@@ -1176,8 +1187,14 @@
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time
-          Thread.sleep(waitTime);
+          // sleeps for the wait time or
+          // until there are empty slots to schedule tasks
+          synchronized (finishedCount) {
+            if (finishedCount.get() == 0) {
+              finishedCount.wait(waitTime);
+            }
+            finishedCount.set(0);
+          }
         }
 
         // If the TaskTracker is just starting up:
@@ -1867,6 +1884,19 @@
     }
   }
 
+  /** 
+   * Notify the tasktracker to send an out-of-band heartbeat.
+   */
+  private void notifyTTAboutTaskCompletion() {
+    if (oobHeartbeatOnTaskCompletion) {
+      synchronized (finishedCount) {
+        int value = finishedCount.get();
+        finishedCount.set(value+1);
+        finishedCount.notify();
+      }
+    }
+  }
+  
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -2185,9 +2215,21 @@
       return wasKilled;
     }
 
-    void reportTaskFinished() {
-      taskFinished();
-      releaseSlot();
+    /**
+     * A task is reporting in as 'done'.
+     * 
+     * We need to notify the tasktracker to send an out-of-band heartbeat.
+     * If isn't <code>commitPending</code>, we need to finalize the task
+     * and release the slot it's occupied.
+     * 
+     * @param commitPending is the task-commit pending?
+     */
+    void reportTaskFinished(boolean commitPending) {
+      if (!commitPending) {
+        taskFinished();
+        releaseSlot();
+      }
+      notifyTTAboutTaskCompletion();
     }
 
     /* State changes:
@@ -2487,8 +2529,10 @@
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      taskStatus.setFinishTime(System.currentTimeMillis());
       removeFromMemoryManager(task.getTaskID());
       releaseSlot();
+      notifyTTAboutTaskCompletion();
     }
     
     private synchronized void releaseSlot() {
@@ -2817,9 +2861,7 @@
       tip = tasks.get(taskid);
     }
     if (tip != null) {
-      if (!commitPending) {
-        tip.reportTaskFinished();
-      }
+      tip.reportTaskFinished(commitPending);
     } else {
       LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
     }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Tue Oct 27 15:43:58 2009
@@ -382,7 +382,7 @@
                                           JobConf conf, 
                                           Class<InterfaceType> cls
                                          ) throws ClassNotFoundException {
-    return conf.getClassByName((String) cl.getOptionValue(key)).asSubclass(cls);
+    return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls);
   }
 
   @Override
@@ -420,15 +420,14 @@
       JobConf job = new JobConf(getConf());
       
       if (results.hasOption("input")) {
-        FileInputFormat.setInputPaths(job, 
-                          (String) results.getOptionValue("input"));
+        FileInputFormat.setInputPaths(job, results.getOptionValue("input"));
       }
       if (results.hasOption("output")) {
         FileOutputFormat.setOutputPath(job, 
-          new Path((String) results.getOptionValue("output")));
+          new Path(results.getOptionValue("output")));
       }
       if (results.hasOption("jar")) {
-        job.setJar((String) results.getOptionValue("jar"));
+        job.setJar(results.getOptionValue("jar"));
       }
       if (results.hasOption("inputformat")) {
         setIsJavaRecordReader(job, true);
@@ -451,7 +450,7 @@
         job.setReducerClass(getClass(results, "reduce", job, Reducer.class));
       }
       if (results.hasOption("reduces")) {
-        job.setNumReduceTasks(Integer.parseInt((String) 
+        job.setNumReduceTasks(Integer.parseInt( 
                                            results.getOptionValue("reduces")));
       }
       if (results.hasOption("writer")) {
@@ -461,18 +460,18 @@
       }
       
       if (results.hasOption("lazyOutput")) {
-        if (Boolean.parseBoolean((String)results.getOptionValue("lazyOutput"))) {
+        if (Boolean.parseBoolean(results.getOptionValue("lazyOutput"))) {
           LazyOutputFormat.setOutputFormatClass(job,
               job.getOutputFormat().getClass());
         }
       }
       
       if (results.hasOption("program")) {
-        setExecutable(job, (String) results.getOptionValue("program"));
+        setExecutable(job, results.getOptionValue("program"));
       }
       if (results.hasOption("jobconf")) {
         LOG.warn("-jobconf option is deprecated, please use -D instead.");
-        String options = (String)results.getOptionValue("jobconf");
+        String options = results.getOptionValue("jobconf");
         StringTokenizer tokenizer = new StringTokenizer(options, ",");
         while (tokenizer.hasMoreTokens()) {
           String keyVal = tokenizer.nextToken().trim();

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Tue Oct 27 15:43:58 2009
@@ -35,11 +35,17 @@
  *   Number of blacklisted and decommissioned trackers.  
  *   </li>
  *   <li>
- *   Task capacity of the cluster. 
+ *   Slot capacity of the cluster. 
+ *   </li>
+ *   <li>
+ *   The number of currently occupied/reserved map & reduce slots.
  *   </li>
  *   <li>
  *   The number of currently running map & reduce tasks.
  *   </li>
+ *   <li>
+ *   The number of job submissions.
+ *   </li>
  * </ol></p>
  * 
  * <p>Clients can query for the latest <code>ClusterMetrics</code>, via 
@@ -48,53 +54,147 @@
  * @see Cluster
  */
 public class ClusterMetrics implements Writable {
-  int runningMaps;
-  int runningReduces;
-  int mapSlots;
-  int reduceSlots;
-  int numTrackers;
-  int numBlacklistedTrackers;
-  int numDecommissionedTrackers;
+  private int runningMaps;
+  private int runningReduces;
+  private int occupiedMapSlots;
+  private int occupiedReduceSlots;
+  private int reservedMapSlots;
+  private int reservedReduceSlots;
+  private int totalMapSlots;
+  private int totalReduceSlots;
+  private int totalJobSubmissions;
+  private int numTrackers;
+  private int numBlacklistedTrackers;
+  private int numDecommissionedTrackers;
 
   public ClusterMetrics() {
   }
   
-  public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots, 
-    int reduceSlots, int numTrackers, int numBlacklistedTrackers,
-    int numDecommisionedNodes) {
+  public ClusterMetrics(int runningMaps, int runningReduces,
+      int occupiedMapSlots, int occupiedReduceSlots,
+      int reservedMapSlots, int reservedReduceSlots,
+      int mapSlots, int reduceSlots, 
+      int totalJobSubmissions,
+      int numTrackers, int numBlacklistedTrackers,
+      int numDecommissionedNodes) {
     this.runningMaps = runningMaps;
     this.runningReduces = runningReduces;
-    this.mapSlots = mapSlots;
-    this.reduceSlots = reduceSlots;
+    this.occupiedMapSlots = occupiedMapSlots;
+    this.occupiedReduceSlots = occupiedReduceSlots;
+    this.reservedMapSlots = reservedMapSlots;
+    this.reservedReduceSlots = reservedReduceSlots;
+    this.totalMapSlots = mapSlots;
+    this.totalReduceSlots = reduceSlots;
+    this.totalJobSubmissions = totalJobSubmissions;
     this.numTrackers = numTrackers;
     this.numBlacklistedTrackers = numBlacklistedTrackers;
-    this.numDecommissionedTrackers = numDecommisionedNodes;
+    this.numDecommissionedTrackers = numDecommissionedNodes;
+  }
+
+  /**
+   * Get the number of running map tasks in the cluster.
+   * 
+   * @return running maps
+   */
+  public int getRunningMaps() {
+    return runningMaps;
   }
   
+  /**
+   * Get the number of running reduce tasks in the cluster.
+   * 
+   * @return running reduces
+   */
+  public int getRunningReduces() {
+    return runningReduces;
+  }
+  
+  /**
+   * Get number of occupied map slots in the cluster.
+   * 
+   * @return occupied map slot count
+   */
   public int getOccupiedMapSlots() { 
-    return runningMaps;
+    return occupiedMapSlots;
   }
   
+  /**
+   * Get the number of occupied reduce slots in the cluster.
+   * 
+   * @return occupied reduce slot count
+   */
   public int getOccupiedReduceSlots() { 
-    return runningReduces; 
+    return occupiedReduceSlots; 
   }
-  
+
+  /**
+   * Get number of reserved map slots in the cluster.
+   * 
+   * @return reserved map slot count
+   */
+  public int getReservedMapSlots() { 
+    return reservedMapSlots;
+  }
+  
+  /**
+   * Get the number of reserved reduce slots in the cluster.
+   * 
+   * @return reserved reduce slot count
+   */
+  public int getReservedReduceSlots() { 
+    return reservedReduceSlots; 
+  }
+
+  /**
+   * Get the total number of map slots in the cluster.
+   * 
+   * @return map slot capacity
+   */
   public int getMapSlotCapacity() {
-    return mapSlots;
+    return totalMapSlots;
   }
   
+  /**
+   * Get the total number of reduce slots in the cluster.
+   * 
+   * @return reduce slot capacity
+   */
   public int getReduceSlotCapacity() {
-    return reduceSlots;
+    return totalReduceSlots;
   }
   
+  /**
+   * Get the total number of job submissions in the cluster.
+   * 
+   * @return total number of job submissions
+   */
+  public int getTotalJobSubmissions() {
+    return totalJobSubmissions;
+  }
+  
+  /**
+   * Get the number of active trackers in the cluster.
+   * 
+   * @return active tracker count.
+   */
   public int getTaskTrackerCount() {
     return numTrackers;
   }
   
+  /**
+   * Get the number of blacklisted trackers in the cluster.
+   * 
+   * @return blacklisted tracker count
+   */
   public int getBlackListedTaskTrackerCount() {
     return numBlacklistedTrackers;
   }
   
+  /**
+   * Get the number of decommissioned trackers in the cluster.
+   * 
+   * @return decommissioned tracker count
+   */
   public int getDecommissionedTaskTrackerCount() {
     return numDecommissionedTrackers;
   }
@@ -103,8 +203,13 @@
   public void readFields(DataInput in) throws IOException {
     runningMaps = in.readInt();
     runningReduces = in.readInt();
-    mapSlots = in.readInt();
-    reduceSlots = in.readInt();
+    occupiedMapSlots = in.readInt();
+    occupiedReduceSlots = in.readInt();
+    reservedMapSlots = in.readInt();
+    reservedReduceSlots = in.readInt();
+    totalMapSlots = in.readInt();
+    totalReduceSlots = in.readInt();
+    totalJobSubmissions = in.readInt();
     numTrackers = in.readInt();
     numBlacklistedTrackers = in.readInt();
     numDecommissionedTrackers = in.readInt();
@@ -114,8 +219,13 @@
   public void write(DataOutput out) throws IOException {
     out.writeInt(runningMaps);
     out.writeInt(runningReduces);
-    out.writeInt(mapSlots);
-    out.writeInt(reduceSlots);
+    out.writeInt(occupiedMapSlots);
+    out.writeInt(occupiedReduceSlots);
+    out.writeInt(reservedMapSlots);
+    out.writeInt(reservedReduceSlots);
+    out.writeInt(totalMapSlots);
+    out.writeInt(totalReduceSlots);
+    out.writeInt(totalJobSubmissions);
     out.writeInt(numTrackers);
     out.writeInt(numBlacklistedTrackers);
     out.writeInt(numDecommissionedTrackers);

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Oct 27 15:43:58 2009
@@ -21,6 +21,8 @@
 import java.io.IOException;
 import java.net.URI;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
@@ -31,6 +33,8 @@
  * A read-only view of the job that is provided to the tasks while they
  * are running.
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public interface JobContext {
   // Put all of the attribute names in here so that Job and JobContext are
   // consistent.

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/MapContext.java Tue Oct 27 15:43:58 2009
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.mapreduce;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * The context that is given to the {@link Mapper}.
  * @param <KEYIN> the key input type to the Mapper
@@ -25,6 +28,8 @@
  * @param <KEYOUT> the key output type from the Mapper
  * @param <VALUEOUT> the value output type from the Mapper
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public interface MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java Tue Oct 27 15:43:58 2009
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
-
 /**
  * <code>OutputCommitter</code> describes the commit of task output for a 
  * Map-Reduce job.
@@ -69,9 +68,38 @@
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
+   * @deprecated Use {@link #commitJob(JobContext)} or
+   *                 {@link #abortJob(JobContext, JobStatus.State)} instead.
+   */
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
+
+  /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
 
+  
+  /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final runstate as {@link JobStatus.State#FAILED} or 
+   * {@link JobStatus.State#KILLED}.
+   *
+   * @param jobContext Context of the job whose output is being written.
+   * @param state final runstate of the job
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, JobStatus.State state) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
   /**
    * Sets up output for the task.
    * 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Tue Oct 27 15:43:58 2009
@@ -21,6 +21,9 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * The context passed to the {@link Reducer}.
  * @param <KEYIN> the class of the input keys
@@ -28,6 +31,8 @@
  * @param <KEYOUT> the class of the output keys
  * @param <VALUEOUT> the class of the output values
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public interface ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Tue Oct 27 15:43:58 2009
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.mapreduce;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.util.Progressable;
 
 /**
  * The context for task attempts.
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public interface TaskAttemptContext extends JobContext, Progressable {
 
   /**

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Tue Oct 27 15:43:58 2009
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * A context object that allows input and output from the task. It is only
  * supplied to the {@link Mapper} or {@link Reducer}.
@@ -28,6 +31,8 @@
  * @param <KEYOUT> the output key type for the task
  * @param <VALUEOUT> the output value type for the task
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
        extends TaskAttemptContext {
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Oct 27 15:43:58 2009
@@ -199,7 +199,7 @@
       Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
     return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
-        baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+        baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
         honorSymLinkConf);
   }
 
@@ -250,7 +250,35 @@
   @Deprecated
   public static void releaseCache(URI cache, Configuration conf)
       throws IOException {
-    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
+	// find the timestamp of the uri
+    URI[] archives = DistributedCache.getCacheArchives(conf);
+    URI[] files = DistributedCache.getCacheFiles(conf);
+    String[] archivesTimestamps =
+          DistributedCache.getArchiveTimestamps(conf);
+    String[] filesTimestamps =
+          DistributedCache.getFileTimestamps(conf);
+    String timestamp = null;
+    if (archives != null) {
+      for (int i = 0; i < archives.length; i++) {
+        if (archives[i].equals(cache)) {
+          timestamp = archivesTimestamps[i];
+          break;
+        }
+      }
+    }
+    if (timestamp == null && files != null) {
+      for (int i = 0; i < files.length; i++) {
+        if (files[i].equals(cache)) {
+          timestamp = filesTimestamps[i];
+          break;
+        }
+      }
+    }
+    if (timestamp == null) {
+      throw new IOException("TimeStamp of the uri couldnot be found");
+    }
+    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
+          Long.parseLong(timestamp));
   }
   
   /**

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Oct 27 15:43:58 2009
@@ -151,23 +151,9 @@
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-      String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
-      String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
 
-      // Get the local path if the cacheFile is already localized or create one
-      // if it doesn't
-      Path localPath;
-      try {
-        localPath = lDirAlloc.getLocalPathToRead(cachePath, taskConf);
-      } catch (DiskErrorException de) {
-        localPath =
-            lDirAlloc.getLocalPathForWrite(cachePath, fileStatus.getLen(),
-                taskConf);
-      }
-
-      String baseDir = localPath.toString().replace(cacheId, "");
       Path p = distributedCacheManager.getLocalCache(uri, taskConf,
-          new Path(baseDir), fileStatus, 
+          cacheSubdir, fileStatus, 
           cacheFile.type == CacheFile.FileType.ARCHIVE,
           cacheFile.timestamp, workdirPath, false);
 
@@ -224,7 +210,7 @@
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      distributedCacheManager.releaseCache(c.uri, taskConf);
+      distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
     }
   }