You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ll...@apache.org on 2012/12/21 23:40:38 UTC

svn commit: r1425171 - in /hadoop/common/branches/branch-1: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: llu
Date: Fri Dec 21 22:40:37 2012
New Revision: 1425171

URL: http://svn.apache.org/viewvc?rev=1425171&view=rev
Log:
MAPREDUCE-4660. Update task placement policy for network topology with node group. (Junping Du via llu)

Added:
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
    hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
    hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Dec 21 22:40:37 2012
@@ -6,6 +6,9 @@ Release 1.2.0 - unreleased
 
   NEW FEATURES
 
+    MAPREDUCE-4660. Update task placement policy for network topology
+    with node group. (Junping Du via llu)
+
     HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child
     hadoop client processes. (Yu Gao via llu)
 

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Dec 21 22:40:37 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.serve
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -327,8 +328,17 @@ public class FairScheduler extends TaskS
     public void jobAdded(JobInProgress job) {
       synchronized (FairScheduler.this) {
         eventLog.log("JOB_ADDED", job.getJobID());
-        JobInfo info = new JobInfo(new JobSchedulable(FairScheduler.this, job, TaskType.MAP),
-            new JobSchedulable(FairScheduler.this, job, TaskType.REDUCE));
+        JobSchedulable mapSched = ReflectionUtils.newInstance(
+            conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class,
+                JobSchedulable.class), conf);
+        mapSched.init(FairScheduler.this, job, TaskType.MAP);
+
+        JobSchedulable redSched = ReflectionUtils.newInstance(
+            conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class,
+                JobSchedulable.class), conf);
+        redSched.init(FairScheduler.this, job, TaskType.REDUCE);
+
+        JobInfo info = new JobInfo(mapSched, redSched);
         infos.put(job, info);
         poolMgr.addJob(job); // Also adds job into the right PoolScheduable
         update();
@@ -585,8 +595,10 @@ public class FairScheduler extends TaskS
   private void updateLastMapLocalityLevel(JobInProgress job,
       Task mapTaskLaunched, TaskTrackerStatus tracker) {
     JobInfo info = infos.get(job);
+    boolean isNodeGroupAware = conf.getBoolean(
+        "net.topology.nodegroup.aware", false);
     LocalityLevel localityLevel = LocalityLevel.fromTask(
-        job, mapTaskLaunched, tracker);
+        job, mapTaskLaunched, tracker, isNodeGroupAware);
     info.lastMapLocalityLevel = localityLevel;
     info.timeWaitedForLocalMap = 0;
     eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Fri Dec 21 22:40:37 2012
@@ -25,9 +25,9 @@ import org.apache.hadoop.mapred.FairSche
 import org.apache.hadoop.mapreduce.TaskType;
 
 public class JobSchedulable extends Schedulable {
-  private FairScheduler scheduler;
-  private JobInProgress job;
-  private TaskType taskType;
+  protected FairScheduler scheduler;
+  protected JobInProgress job;
+  protected TaskType taskType;
   private int demand = 0;
 
   public JobSchedulable(FairScheduler scheduler, JobInProgress job, 
@@ -38,6 +38,18 @@ public class JobSchedulable extends Sche
     
     initMetrics();
   }
+
+  public JobSchedulable() {
+  }
+
+  public void init(FairScheduler scheduler, JobInProgress job,
+      TaskType taskType) {
+    this.scheduler = scheduler;
+    this.job = job;
+    this.taskType = taskType;
+
+    initMetrics();
+  }
   
   @Override
   public TaskType getTaskType() {
@@ -87,7 +99,7 @@ public class JobSchedulable extends Sche
     }
   }
 
-  private boolean isRunnable() {
+  protected boolean isRunnable() {
     JobInfo info = scheduler.getJobInfo(job);
     int runState = job.getStatus().getRunState();
     return (info != null && info.runnable && runState == JobStatus.RUNNING);

Added: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java?rev=1425171&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulableWithNodeGroup.java Fri Dec 21 22:40:37 2012
@@ -0,0 +1,62 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class JobSchedulableWithNodeGroup extends JobSchedulable {
+
+  public JobSchedulableWithNodeGroup(FairScheduler scheduler,
+      JobInProgress job, TaskType taskType) {
+    super(scheduler, job, taskType);
+  }
+
+  public JobSchedulableWithNodeGroup() {
+  }
+
+  @Override
+  public Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException {
+    if (isRunnable()) {
+      visited.add(job);
+      TaskTrackerManager ttm = scheduler.taskTrackerManager;
+      ClusterStatus clusterStatus = ttm.getClusterStatus();
+      int numTaskTrackers = clusterStatus.getTaskTrackers();
+
+      // check with the load manager whether it is safe to 
+      // launch this task on this taskTracker.
+      LoadManager loadMgr = scheduler.getLoadManager();
+      if (!loadMgr.canLaunchTask(tts, job, taskType)) {
+        return null;
+      }
+      if (taskType == TaskType.MAP) {
+        LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
+            job, currentTime);
+        scheduler.getEventLog().log(
+            "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
+        switch (localityLevel) {
+          case NODE:
+            return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers,
+                ttm.getNumberOfUniqueHosts());
+          case NODEGROUP:
+            // locality level for nodegroup is 2
+            return job.obtainNewMapTaskCommon(tts, numTaskTrackers, 
+                ttm.getNumberOfUniqueHosts(), 2);
+          case RACK:
+            return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers,
+                ttm.getNumberOfUniqueHosts());
+          default:
+            return job.obtainNewMapTask(tts, numTaskTrackers,
+                ttm.getNumberOfUniqueHosts());
+        }
+      } else {
+        return job.obtainNewReduceTask(tts, numTaskTrackers,
+            ttm.getNumberOfUniqueHosts());
+      }
+    } else {
+      return null;
+    }
+  }
+
+}

Modified: hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java (original)
+++ hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java Fri Dec 21 22:40:37 2012
@@ -23,13 +23,14 @@ package org.apache.hadoop.mapred;
  * is allowed to launch tasks. By default, jobs are not allowed to launch
  * non-data-local tasks until they have waited a small number of seconds to
  * find a slot on a node that they have data on. If a job has waited this
- * long, it is allowed to launch rack-local tasks as well (on nodes that may
- * not have the task's input data, but share a rack with a node that does).
- * Finally, after a further wait, jobs are allowed to launch tasks anywhere
- * in the cluster.
+ * long, it is allowed to launch other locality tasks as well, such as: 
+ * nodegroup-local if the topology support nodegroup layer, rack-local (on 
+ * nodes that may not have the task's input data, but share a rack with a node
+ * that does). Finally, after a further wait, jobs are allowed to launch tasks
+ * anywhere in the cluster.
  * 
- * This enum defines three levels - NODE, RACK and ANY (for allowing tasks
- * to be launched on any node). A map task's level can be obtained from
+ * This enum defines four levels - NODE, NODEGROUP, RACK and ANY (for allowing
+ * tasks to be launched on any node). A map task's level can be obtained from
  * its job through {@link #fromTask(JobInProgress, Task, TaskTrackerStatus)}. In
  * addition, for any locality level, it is possible to get a "level cap" to pass
  * to {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
@@ -37,16 +38,25 @@ package org.apache.hadoop.mapred;
  * the {@link #toCacheLevelCap()} method.
  */
 public enum LocalityLevel {
-  NODE, RACK, ANY;
+  NODE, NODEGROUP, RACK, ANY;
   
   public static LocalityLevel fromTask(JobInProgress job, Task mapTask,
-      TaskTrackerStatus tracker) {
+      TaskTrackerStatus tracker, boolean isNodeGroupAware) {
     TaskID tipID = mapTask.getTaskID().getTaskID();
     TaskInProgress tip = job.getTaskInProgress(tipID);
-    switch (job.getLocalityLevel(tip, tracker)) {
-    case 0: return LocalityLevel.NODE;
-    case 1: return LocalityLevel.RACK;
-    default: return LocalityLevel.ANY;
+    if (isNodeGroupAware) {
+      switch (job.getLocalityLevel(tip, tracker)) {
+        case 0: return LocalityLevel.NODE;
+        case 1: return LocalityLevel.NODEGROUP;
+        case 2: return LocalityLevel.RACK;
+        default: return LocalityLevel.ANY;
+      }
+    } else {
+      switch (job.getLocalityLevel(tip, tracker)) {
+        case 0: return LocalityLevel.NODE;
+        case 1: return LocalityLevel.RACK;
+        default: return LocalityLevel.ANY;
+      }
     }
   }
   
@@ -55,11 +65,20 @@ public enum LocalityLevel {
    * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
    * to ensure that only tasks of this locality level and lower are launched.
    */
-  public int toCacheLevelCap() {
-    switch(this) {
-    case NODE: return 1;
-    case RACK: return 2;
-    default: return Integer.MAX_VALUE;
+  public int toCacheLevelCap(boolean isNodeGroupAware) {
+    if (isNodeGroupAware) {
+      switch(this) {
+        case NODE: return 1;
+        case NODEGROUP: return 2;
+        case RACK: return 3;
+        default: return Integer.MAX_VALUE;
+      }
+    } else {
+      switch(this) {
+        case NODE: return 1;
+        case RACK: return 2;
+        default: return Integer.MAX_VALUE;
+      }
     }
   }
 }

Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Fri Dec 21 22:40:37 2012
@@ -297,6 +297,20 @@
 </property>
 
 <property>
+  <name>mapred.jobtracker.nodegroup.aware</name>
+  <value>false</value>
+  <description>Identify if jobtracker is aware of nodegroup layer.</description>
+</property>
+
+<property>
+  <name>mapred.jobtracker.jobSchedulable</name>
+  <value>org.apache.hadoop.mapred.JobSchedulable</value>
+  <description>The class responsible for an entity in FairScheduler that can
+  launch tasks.
+  </description>
+</property>
+
+<property>
   <name>mapred.jobtracker.taskScheduler.maxRunningTasksPerJob</name>
   <value></value>
   <description>The maximum number of running tasks for a job before

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 21 22:40:37 2012
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.Counters.CountersExceededException;
-import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
@@ -70,7 +69,7 @@ import org.apache.hadoop.util.StringUtil
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
- * a Job on the straight and narrow.  It keeps its JobProfile
+ * a Job on the straight and narrow. It keeps its JobProfile
  * and its latest JobStatus, plus a set of tables for 
  * doing bookkeeping of its Tasks.
  * ***********************************************************
@@ -255,7 +254,6 @@ public class JobInProgress {
   private String submitHostAddress;
   private String user;
   private String historyFile = "";
-  private boolean historyFileCopied;
   
   // Per-job counters
   public static enum Counter { 
@@ -265,6 +263,7 @@ public class JobInProgress {
     TOTAL_LAUNCHED_REDUCES,
     OTHER_LOCAL_MAPS,
     DATA_LOCAL_MAPS,
+    NODEGROUP_LOCAL_MAPS,
     RACK_LOCAL_MAPS,
     SLOTS_MILLIS_MAPS,
     SLOTS_MILLIS_REDUCES,
@@ -519,7 +518,7 @@ public class JobInProgress {
   public void cleanUpMetrics() {
     // per job metrics is disabled for now.
   }
-    
+
   private void printCache (Map<Node, List<TaskInProgress>> cache) {
     LOG.info("The taskcache info:");
     for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
@@ -701,8 +700,8 @@ public class JobInProgress {
     TaskSplitMetaInfo[] splits = createSplits(jobId);
     if (numMapTasks != splits.length) {
       throw new IOException("Number of maps in JobConf doesn't match number of " +
-      		"recieved splits for job " + jobId + "! " +
-      		"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
+          "recieved splits for job " + jobId + "! " +
+          "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
     }
     numMapTasks = splits.length;
 
@@ -1310,27 +1309,49 @@ public class JobInProgress {
                                             int clusterSize, 
                                             int numUniqueHosts
                                            ) throws IOException {
-    if (status.getRunState() != JobStatus.RUNNING) {
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 
+        anyCacheLevel);
+  }
+
+  /**
+   * Return a MapTask with locality level that smaller or equal than a given
+   * locality level to tasktracker.
+   * 
+   * @param tts The task tracker that is asking for a task
+   * @param clusterSize The number of task trackers in the cluster
+   * @param numUniqueHosts The number of hosts that run task trackers
+   * @param avgProgress The average progress of this kind of task in this job
+   * @param maxCacheLevel The maximum topology level until which to schedule
+   *                      maps.
+   * @return the index in tasks of the selected task (or -1 for no task)
+   * @throws IOException
+   */
+  public synchronized Task obtainNewMapTaskCommon(
+      TaskTrackerStatus tts, int clusterSize, int numUniqueHosts, 
+      int maxCacheLevel) throws IOException {
+    if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       try { throw new IOException("state = " + status.getRunState()); }
       catch (IOException ioe) {ioe.printStackTrace();}
       return null;
     }
-        
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel,
+
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel, 
                                 status.mapProgress());
     if (target == -1) {
       return null;
     }
-    
+
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      resetSchedulingOpportunities();
+      // DO NOT reset for off-switch!
+      if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) {
+        resetSchedulingOpportunities();
+      }
     }
-
     return result;
-  }    
+  }
 
   /*
    * Return task cleanup attempt if any, to run on a given tracker
@@ -1373,78 +1394,22 @@ public class JobInProgress {
   public synchronized Task obtainNewNodeLocalMapTask(TaskTrackerStatus tts,
                                                      int clusterSize,
                                                      int numUniqueHosts)
-  throws IOException {
-    if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      try { throw new IOException("state = " + status.getRunState()); }
-      catch (IOException ioe) {ioe.printStackTrace();}
-      return null;
-    }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 1, 
-                                status.mapProgress());
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      resetSchedulingOpportunities();
-    }
-
-    return result;
+      throws IOException {
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 1);
   }
   
   public synchronized Task obtainNewNodeOrRackLocalMapTask(
       TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
   throws IOException {
-    if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      try { throw new IOException("state = " + status.getRunState()); }
-      catch (IOException ioe) {ioe.printStackTrace();}
-      return null;
-    }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
-                                status.mapProgress());
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      resetSchedulingOpportunities();
-    }
-
-    return result;
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel);
   }
   
   public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
                                                     int clusterSize, 
                                                     int numUniqueHosts)
-  throws IOException {
-    if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      try { throw new IOException("state = " + status.getRunState()); }
-      catch (IOException ioe) {ioe.printStackTrace();}
-      return null;
-    }
-
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
-                                NON_LOCAL_CACHE_LEVEL, status.mapProgress());
-    if (target == -1) {
-      return null;
-    }
-
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-      // DO NOT reset for off-switch!
-    }
-
-    return result;
+      throws IOException {
+    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 
+        NON_LOCAL_CACHE_LEVEL);
   }
   
   public void schedulingOpportunity() {
@@ -1749,7 +1714,7 @@ public class JobInProgress {
     // keeping the earlier ordering intact
     String name;
     String splits = "";
-    Enum counter = null;
+    Enum<Counter> counter = null;
     if (tip.isJobSetupTask()) {
       launchedSetup = true;
       name = Values.SETUP.name();
@@ -1815,26 +1780,57 @@ public class JobInProgress {
           }
         }
       }
-      switch (level) {
-      case 0 :
-        LOG.info("Choosing data-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+      logAndIncreJobCounters(tip, level, jobtracker.isNodeGroupAware());
+    }
+  }
+
+  private void logAndIncreJobCounters(TaskInProgress tip, int level, 
+      boolean isNodeGroupAware) {
+    switch (level) {
+      case 0:
+        logAndIncrDataLocalMaps(tip);
         break;
       case 1:
-        LOG.info("Choosing rack-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+        if (isNodeGroupAware) {
+          logAndIncrNodeGroupLocalMaps(tip);
+        } else {
+          logAndIncrRackLocalMaps(tip);
+        }
         break;
-      default :
+      case 2:
+        if (isNodeGroupAware) {
+          logAndIncrRackLocalMaps(tip);
+        }
+        break;
+      default:
         // check if there is any locality
         if (level != this.maxLevel) {
-          LOG.info("Choosing cached task at level " + level + tip.getTIPId());
-          jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+          logAndIncrOtherLocalMaps(tip, level);
         }
         break;
-      }
     }
   }
 
+  private void logAndIncrOtherLocalMaps(TaskInProgress tip, int level) {
+    LOG.info("Choosing cached task at level " + level + tip.getTIPId());
+    jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+  }
+
+  private void logAndIncrNodeGroupLocalMaps(TaskInProgress tip) {
+    LOG.info("Choosing nodeGroup-local task " + tip.getTIPId());
+    jobCounters.incrCounter(Counter.NODEGROUP_LOCAL_MAPS, 1);
+  }
+
+  private void logAndIncrRackLocalMaps(TaskInProgress tip) {
+    LOG.info("Choosing rack-local task " + tip.getTIPId());
+    jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+  }
+
+  private void logAndIncrDataLocalMaps(TaskInProgress tip) {
+    LOG.info("Choosing data-local task " + tip.getTIPId());
+    jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+  }
+
   void setFirstTaskLaunchTime(TaskInProgress tip) {
     TaskType key = tip.getFirstTaskType();
 

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties Fri Dec 21 22:40:37 2012
@@ -22,6 +22,7 @@ TOTAL_LAUNCHED_REDUCES.name=       Launc
 OTHER_LOCAL_MAPS.name=             Other local map tasks
 DATA_LOCAL_MAPS.name=              Data-local map tasks
 RACK_LOCAL_MAPS.name=              Rack-local map tasks
+NODEGROUP_LOCAL_MAPS.name=         NodeGroup-local map tasks
 FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
 FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)
 

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri Dec 21 22:40:37 2012
@@ -172,7 +172,8 @@ class JobQueueTaskScheduler extends Task
 
           Task t = null;
           
-          // Try to schedule a node-local or rack-local Map task
+          // Try to schedule a Map task with locality between node-local 
+          // and rack-local
           t = 
             job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                 numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Dec 21 22:40:37 2012
@@ -204,8 +204,9 @@ public class JobTracker implements MRCon
   static final String JOB_INFO_FILE = "job-info";
   static final String JOB_TOKEN_FILE = "jobToken";
   private DNSToSwitchMapping dnsToSwitchMapping;
-  private NetworkTopology clusterMap = new NetworkTopology();
+  private NetworkTopology clusterMap;
   private int numTaskCacheLevels; // the max level to which we cache tasks
+  private boolean isNodeGroupAware;
   /**
    * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap}
    * so that it can be safely written to and iterated on via 2 separate threads.
@@ -1935,6 +1936,11 @@ public class JobTracker implements MRCon
     LOG.info("Starting jobtracker with owner as " +
         getMROwner().getShortUserName());
 
+    // Create network topology
+    clusterMap = (NetworkTopology) ReflectionUtils.newInstance(
+            conf.getClass("net.topology.impl", NetworkTopology.class,
+                NetworkTopology.class), conf);
+
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
       = conf.getClass("mapred.jobtracker.taskScheduler",
@@ -2000,6 +2006,8 @@ public class JobTracker implements MRCon
             DNSToSwitchMapping.class), conf);
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
+    this.isNodeGroupAware = conf.getBoolean(
+            "mapred.jobtracker.nodegroup.aware", false);
     
     plugins = conf.getInstances("mapreduce.jobtracker.plugins",
         ServicePlugin.class);
@@ -2821,7 +2829,9 @@ public class JobTracker implements MRCon
   public int getNumberOfUniqueHosts() {
     return uniqueHostsMap.size();
   }
-  
+  public boolean isNodeGroupAware() {
+    return isNodeGroupAware;
+  }
   public void addJobInProgressListener(JobInProgressListener listener) {
     jobInProgressListeners.add(listener);
   }
@@ -2971,7 +2981,7 @@ public class JobTracker implements MRCon
         }
       }
     }
-      
+
     // Check for tasks to be killed
     List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
     if (killTasksList != null) {

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1425171&r1=1425170&r2=1425171&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Dec 21 22:40:37 2012
@@ -56,7 +56,7 @@ public class MiniMRCluster {
     
   private String namenode;
   private UserGroupInformation ugi = null;
-  private JobConf conf;
+  protected JobConf conf;
   private int numTrackerToExclude;
     
   private JobConf job;

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java?rev=1425171&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java Fri Dec 21 22:40:37 2012
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+
+public class MiniMRClusterWithNodeGroup extends MiniMRCluster {
+
+  private String[] nodeGroups;
+
+  public MiniMRClusterWithNodeGroup(int numTaskTrackers, String namenode, int numDir, 
+      String[] racks, String[] nodeGroups, String[] hosts, JobConf conf) throws IOException {
+    super(numTaskTrackers, namenode, numDir, racks, hosts, conf);
+    this.nodeGroups = nodeGroups;
+  }
+
+  /**
+   * Start the tasktracker.
+   */
+  @Override
+  public void startTaskTracker(String host, String rack,
+      int idx, int numDir) throws IOException {
+    if (rack != null && nodeGroups != null) {
+      StaticMapping.addNodeToRack(host, rack + nodeGroups);
+    }
+
+    if (host != null) {
+      NetUtils.addStaticResolution(host, "localhost");
+    }
+
+    TaskTrackerRunner taskTracker;
+    taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
+
+    addTaskTracker(taskTracker);
+  }
+
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java?rev=1425171&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestNodeGroupAwareTaskPlacement.java Fri Dec 21 22:40:37 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.net.StaticMapping;
+import org.junit.BeforeClass;
+
+public class TestNodeGroupAwareTaskPlacement extends TestCase {
+
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String nodeGroup1[] = new String[] {
+    "/nodegroup1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.nodegroup1.rack1"
+  };
+  private static final String rack2[] = new String[] {
+    "/r1", "/r2"
+  };
+  private static final String nodeGroup2[] = new String[] {
+    "/nodegroup2", "/nodegroup3"
+  };
+  private static final String hosts2[] = new String[] {
+    "host2.nodegroup2.rack1", "host2.nodegroup3.rack2"
+  };
+  private static final String hosts3[] = new String[] {
+    "host2.nodegroup3.rack2"
+  };
+  private static final String nodeGroup3[] = new String[] {
+    "/nodegroup3"
+  };
+  private static final String rack3[] = new String[] {
+    "/r2"
+  };
+  private static final String hosts4[] = new String[] {
+    "host3.nodegroup1.rack1"
+  };
+  private static final String nodeGroup4[] = new String[] {
+    "/nodegroup1"
+  };
+  private static final String rack4[] = new String[] {
+    "/r1"
+  };
+  final Path inDir = new Path("/nodegrouptesting");
+  final Path outputPath = new Path("/output");
+
+  /**
+   * Launches a MR job and tests the job counters against the expected values.
+   * @param testName The name for the job
+   * @param mr The MR cluster
+   * @param fileSys The FileSystem
+   * @param in Input path
+   * @param out Output path
+   * @param numMaps Number of maps
+   * @param otherLocalMaps Expected value of other local maps
+   * @param rackLocalMaps Expected value of rack local maps
+   * @param nodeGroupLocalMaps Expected value of nodeGroup local maps
+   * @param dataLocalMaps Expected value of data(node) local maps
+   * @param jobConfig Configuration for running job
+   */
+  static void launchJobAndTestCounters(String jobName, MiniMRCluster mr,
+                                       FileSystem fileSys, Path in, Path out,
+                                       int numMaps, int otherLocalMaps,
+                                       int rackLocalMaps, int nodeGroupLocalMaps,
+                                       int dataLocalMaps, JobConf jobConfig)
+  throws IOException {
+    JobConf jobConf = mr.createJobConf(jobConfig);
+    if (fileSys.exists(out)) {
+        fileSys.delete(out, true);
+    }
+    RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
+    Counters counters = job.getCounters();
+    assertEquals("Number of local maps",
+            counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
+    assertEquals("Number of Data-local maps",
+            counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS),
+                                dataLocalMaps);
+    assertEquals("Number of NodeGroup-local maps",
+            counters.getCounter(JobInProgress.Counter.NODEGROUP_LOCAL_MAPS),
+                                nodeGroupLocalMaps);
+    assertEquals("Number of Rack-local maps",
+            counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS),
+                                rackLocalMaps);
+
+    mr.waitUntilIdle();
+    mr.shutdown();
+  }
+
+  @BeforeClass
+  public void setUp(){
+    // map host to related locations
+    StaticMapping.addNodeToRack(hosts1[0], rack1[0]+nodeGroup1[0]);
+    StaticMapping.addNodeToRack(hosts2[0], rack2[0]+nodeGroup2[0]);
+    StaticMapping.addNodeToRack(hosts2[1], rack2[1]+nodeGroup2[1]);
+    StaticMapping.addNodeToRack(hosts4[0], rack4[0]+nodeGroup4[0]);
+  }
+
+  public void testTaskPlacement() throws IOException {
+    String namenode = null;
+    MiniDFSClusterWithNodeGroup dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    String testName = "TestForNodeGroupAwareness";
+    try {
+      final int taskTrackers = 1;
+
+      /* Start 4 datanodes, two in rack r1/nodegroup1, one in r1/nodegroup2 and
+       * the other one in r2/nodegroup3. Create three
+       * files (splits).
+       * 1) file1, just after starting the datanode on r1/nodegroup1, with
+       *    a repl factor of 1, and,
+       * 2) file2 & file3 after starting the two datanodes in r1/nodegroup2 and
+       *    r2/nodegroup3, with a repl factor of 3.
+       * 3) start the last data node (datanode4) in r1/nodegroup1
+       * At the end, file1 will be present on only datanode1, and, file2 and
+       * file3, will be present on all datanodes except datanode4.
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+
+      conf.set("dfs.block.replicator.classname",
+          "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
+
+      conf.set("net.topology.impl",
+          "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+
+      conf.setBoolean("net.topology.nodegroup.aware", true);
+
+      conf.setBoolean("mapred.jobtracker.nodegroup.aware", true);
+      conf.setInt("mapred.task.cache.levels", 3);
+
+      conf.set("mapred.jobtracker.jobSchedulable",
+          "org.apache.hadoop.mapred.JobSchedulableWithNodeGroup");
+
+      JobConf jobConf = new JobConf(conf);
+
+      MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroup1);
+      // start the dfs cluster with datanode1 only.
+      dfs = new MiniDFSClusterWithNodeGroup(0, conf, 1,
+                true, true, null, rack1, hosts1, null);
+
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      // write file1 on datanode1 with 1 replica
+      UtilsForTests.writeFile(
+          dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
+      // start another two datanodes (2 and 3)
+      dfs.startDataNodes(conf, 2, true, null, rack2, nodeGroup2, hosts2, null);
+
+      dfs.waitActive();
+      // write two files with 3 replica, so each datanodes will have one replica
+      // of file2 and file3
+      UtilsForTests.writeFile(
+          dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
+      UtilsForTests.writeFile(
+          dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+                 (dfs.getFileSystem()).getUri().getPort();
+      /* Run a job with the (only)tasktracker which is under r2/nodegroup3 and
+       * check the task placement that how many data/nodegroup/rack local maps
+       *  it runs. The hostname of the tasktracker is set to same as datanode3.
+       */
+      mr = new MiniMRClusterWithNodeGroup(taskTrackers, namenode, 1, rack3,
+          nodeGroup3, hosts3, jobConf);
+      /* The job is configured with three maps since there are three
+       * (non-splittable) files. On rack2, there are two files and both
+       * have repl of three. The blocks for those files must therefore be
+       * present on all the datanodes (except datanode4), in particular,
+       * the datanode3 on rack2. The third input file is pulled from rack1,
+       * thus the result should be 2 rack-local maps.
+       */
+      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+          0, 0, 2, jobConf);
+      mr.shutdown();
+
+      /* Run a job with the (only)tasktracker on datanode4.
+       */
+      mr = new MiniMRClusterWithNodeGroup(taskTrackers, namenode, 1, rack4,
+          nodeGroup4, hosts4, jobConf);
+
+      /* The job is configured with three maps since there are three
+       * (non-splittable) files. As the way in which repl was setup while
+       * creating the files, we will have all the three files on datanode1 which
+       * is on the same nodegroup with datanode4 where the only tasktracker run.
+       * Thus, the result should be 3 nodegroup-local maps.
+       * The MapReduce cluster have only 1 node which is host4 but no datanode
+       * running on that host. So this is to verify that in compute/data node
+       * separation case, it still can get nodegroup level locality in task
+       * scheduling.
+       */
+      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+          0, 3, 0, jobConf);
+      mr.shutdown();
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
+                              int numMaps, String jobName) throws IOException {
+    jobConf.setJobName(jobName);
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outputPath);
+    jobConf.setMapperClass(IdentityMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(0);
+    jobConf.setJar("build/test/testjar/testjob.jar");
+    return JobClient.runJob(jobConf);
+  }
+}