You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/07/06 11:14:49 UTC

svn commit: r791418 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: ddas
Date: Mon Jul  6 09:14:48 2009
New Revision: 791418

URL: http://svn.apache.org/viewvc?rev=791418&view=rev
Log:
MAPREDUCE-701. Improves the runtime of the TestRackAwareTaskPlacement by making it a unit test. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul  6 09:14:48 2009
@@ -72,6 +72,9 @@
     MAPREDUCE-692. Make Hudson run Sqoop unit tests.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-701. Improves the runtime of the TestRackAwareTaskPlacement
+    by making it a unit test.  (Jothi Padmanabhan via ddas)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jul  6 09:14:48 2009
@@ -98,10 +98,10 @@
   int speculativeReduceTasks = 0;
   
   // Limits on concurrent running tasks per-node and cluster-wide
-  private int maxMapsPerNode;
-  private int maxReducesPerNode;
-  private int runningMapLimit;
-  private int runningReduceLimit;
+  int maxMapsPerNode;
+  int maxReducesPerNode;
+  int runningMapLimit;
+  int runningReduceLimit;
   
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
@@ -140,7 +140,7 @@
   // A list of cleanup tasks for the reduce task attempts, to be launched
   List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
-  private int maxLevel;
+  int maxLevel;
 
   /**
    * A special value indicating that 
@@ -434,7 +434,7 @@
     }
   }
   
-  private Map<Node, List<TaskInProgress>> createCache(
+  Map<Node, List<TaskInProgress>> createCache(
                          JobClient.RawSplit[] splits, int maxLevel) {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Mon Jul  6 09:14:48 2009
@@ -25,7 +25,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.TestRackAwareTaskPlacement;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobCounter;
 
 /**
  * This test checks whether the task caches are created and used properly.
@@ -110,7 +114,7 @@
        * Since the datanode is running under different subtree, there is no
        * node-level data locality but there should be topological locality.
        */
-      TestRackAwareTaskPlacement.launchJobAndTestCounters(
+      launchJobAndTestCounters(
     		  testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
       mr.shutdown();
     } finally {
@@ -121,4 +125,57 @@
       }
     }
   }
+  
+
+  /**
+   * Launches a MR job and tests the job counters against the expected values.
+   * @param testName The name for the job
+   * @param mr The MR cluster
+   * @param fileSys The FileSystem
+   * @param in Input path
+   * @param out Output path
+   * @param numMaps Number of maps
+   * @param otherLocalMaps Expected value of other local maps
+   * @param datalocalMaps Expected value of data(node) local maps
+   * @param racklocalMaps Expected value of rack local maps
+   */
+  static void launchJobAndTestCounters(String jobName, MiniMRCluster mr,
+                                       FileSystem fileSys, Path in, Path out,
+                                       int numMaps, int otherLocalMaps,
+                                       int dataLocalMaps, int rackLocalMaps)
+  throws IOException {
+    JobConf jobConf = mr.createJobConf();
+    if (fileSys.exists(out)) {
+        fileSys.delete(out, true);
+    }
+    RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
+    Counters counters = job.getCounters();
+    assertEquals("Number of local maps",
+            counters.getCounter(JobCounter.OTHER_LOCAL_MAPS), otherLocalMaps);
+    assertEquals("Number of Data-local maps",
+            counters.getCounter(JobCounter.DATA_LOCAL_MAPS),
+                                dataLocalMaps);
+    assertEquals("Number of Rack-local maps",
+            counters.getCounter(JobCounter.RACK_LOCAL_MAPS),
+                                rackLocalMaps);
+    mr.waitUntilIdle();
+    mr.shutdown();
+  }
+  
+  static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
+      int numMaps, String jobName) throws IOException {
+    jobConf.setJobName(jobName);
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outputPath);
+    jobConf.setMapperClass(IdentityMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(0);
+    jobConf.setJar("build/test/mapred/testjar/testjob.jar");
+    return JobClient.runJob(jobConf);
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Mon Jul  6 09:14:48 2009
@@ -15,167 +15,160 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
+import junit.extensions.TestSetup;
+import junit.framework.Test;
 import junit.framework.TestCase;
+import junit.framework.TestSuite;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.StaticMapping;
 
+/**
+ * A JUnit test to test configured task limits.
+ */
 public class TestRackAwareTaskPlacement extends TestCase {
-  private static final String rack1[] = new String[] {
-    "/r1"
-  };
-  private static final String hosts1[] = new String[] {
-    "host1.rack1.com"
-  };
-  private static final String rack2[] = new String[] {
-    "/r2", "/r2"
-  };
-  private static final String hosts2[] = new String[] {
-    "host1.rack2.com", "host2.rack2.com"
-  };
-  private static final String hosts3[] = new String[] {
-    "host3.rack1.com"
-  };
-  private static final String hosts4[] = new String[] {
-    "host1.rack2.com"
-  };
-  final Path inDir = new Path("/racktesting");
-  final Path outputPath = new Path("/output");
+
+  static String trackers[] = new String[] {"tracker_tracker1.r1.com:1000", 
+    "tracker_tracker2.r1.com:1000", "tracker_tracker3.r2.com:1000",
+    "tracker_tracker4.r3.com:1000"};
   
-  /**
-   * Launches a MR job and tests the job counters against the expected values.
-   * @param testName The name for the job
-   * @param mr The MR cluster
-   * @param fileSys The FileSystem
-   * @param in Input path
-   * @param out Output path
-   * @param numMaps Number of maps
-   * @param otherLocalMaps Expected value of other local maps
-   * @param datalocalMaps Expected value of data(node) local maps
-   * @param racklocalMaps Expected value of rack local maps
-   */
-  static void launchJobAndTestCounters(String jobName, MiniMRCluster mr, 
-                                       FileSystem fileSys, Path in, Path out,
-                                       int numMaps, int otherLocalMaps,
-                                       int dataLocalMaps, int rackLocalMaps) 
-  throws IOException {
-    JobConf jobConf = mr.createJobConf();
-    if (fileSys.exists(out)) {
-        fileSys.delete(out, true);
-    }
-    RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
-    Counters counters = job.getCounters();
-    assertEquals("Number of local maps", 
-            counters.getCounter(JobCounter.OTHER_LOCAL_MAPS), otherLocalMaps);
-    assertEquals("Number of Data-local maps", 
-            counters.getCounter(JobCounter.DATA_LOCAL_MAPS), 
-                                dataLocalMaps);
-    assertEquals("Number of Rack-local maps", 
-            counters.getCounter(JobCounter.RACK_LOCAL_MAPS), 
-                                rackLocalMaps);
-    mr.waitUntilIdle();
-    mr.shutdown();
+  static String[] allHosts = 
+    new String[] {"tracker1.r1.com", "tracker2.r1.com", "tracker3.r2.com",
+                  "tracker4.r3.com"};
+  
+  static String[] allRacks =
+    new String[] { "/r1", "/r1", "/r2", "/r3"};
+
+  static FakeJobTracker jobTracker;
+  static String jtIdentifier = "test";
+  private static int jobCounter;
+  
+  public static Test suite() {
+    TestSetup setup = 
+      new TestSetup(new TestSuite(TestRackAwareTaskPlacement.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("mapred.job.tracker", "localhost:0");
+        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        conf.setClass("topology.node.switch.mapping.impl", 
+          StaticMapping.class, DNSToSwitchMapping.class);
+        jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
+        // Set up the Topology Information
+        for (int i = 0; i < allHosts.length; i++) {
+          StaticMapping.addNodeToRack(allHosts[i], allRacks[i]);
+        }
+        for (String tracker : trackers) {
+          FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+        }
+      }
+    };
+    return setup;
   }
+   
+  static class MyFakeJobInProgress extends JobInProgress {
+    static JobID jobid;
+    int numMaps;
+
+    MyFakeJobInProgress(JobConf jc, JobTracker jt) throws IOException {
+      super((jobid = new JobID(jtIdentifier, jobCounter ++)), jc, jt);
+      Path jobFile = new Path("Dummy");
+      this.profile = new JobProfile(jc.getUser(), jobid, 
+          jobFile.toString(), null, jc.getJobName(),
+          jc.getQueueName());
+      this.maxMapsPerNode = jc.getMaxMapsPerNode();
+      this.maxReducesPerNode = jc.getMaxReducesPerNode();
+      this.runningMapLimit = jc.getRunningMapLimit();
+      this.runningReduceLimit = jc.getRunningReduceLimit();
+    }
 
-  public void testTaskPlacement() throws IOException {
-    String namenode = null;
-    MiniDFSCluster dfs = null;
-    MiniMRCluster mr = null;
-    FileSystem fileSys = null;
-    String testName = "TestForRackAwareness";
-    try {
-      final int taskTrackers = 1;
-
-      /* Start 3 datanodes, one in rack r1, and two in r2. Create three
-       * files (splits).
-       * 1) file1, just after starting the datanode on r1, with 
-       *    a repl factor of 1, and,
-       * 2) file2 & file3 after starting the other two datanodes, with a repl 
-       *    factor of 3.
-       * At the end, file1 will be present on only datanode1, and, file2 and 
-       * file3, will be present on all datanodes. 
-       */
-      Configuration conf = new Configuration();
-      conf.setBoolean("dfs.replication.considerLoad", false);
-      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
-      dfs.waitActive();
-      fileSys = dfs.getFileSystem();
-      if (!fileSys.mkdirs(inDir)) {
-        throw new IOException("Mkdirs failed to create " + inDir.toString());
+    @Override
+    public void initTasks() throws IOException {
+      JobClient.RawSplit[] splits = createSplits();
+      numMapTasks = splits.length;
+      createMapTasks(null, splits);
+      nonRunningMapCache = createCache(splits, maxLevel);
+      tasksInited.set(true);
+      this.status.setRunState(JobStatus.RUNNING);
+
+    }
+  
+
+    protected JobClient.RawSplit[] createSplits() throws IOException {
+      RawSplit[] splits = new RawSplit[numMaps];
+      // Hand code for now. 
+      // M0,2,3 reside in Host1
+      // M1 resides in Host3
+      // M4 resides in Host4
+      String[] splitHosts0 = new String[] { allHosts[0] };
+
+      for (int i = 0; i < numMaps; i++) {
+        splits[i] = new RawSplit();
+        splits[i].setDataLength(0);
       }
-      UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
-      dfs.startDataNodes(conf, 2, true, null, rack2, hosts2, null);
-      dfs.waitActive();
 
-      UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
-      UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
+      splits[0].setLocations(splitHosts0);
+      splits[2].setLocations(splitHosts0);
+      splits[3].setLocations(splitHosts0);
       
-      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
-                 (dfs.getFileSystem()).getUri().getPort(); 
-      /* Run a job with the (only)tasktracker on rack2. The rack location
-       * of the tasktracker will determine how many data/rack local maps it
-       * runs. The hostname of the tasktracker is set to same as one of the 
-       * datanodes.
-       */
-      mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
-
-      /* The job is configured with three maps since there are three 
-       * (non-splittable) files. On rack2, there are two files and both
-       * have repl of three. The blocks for those files must therefore be
-       * present on all the datanodes, in particular, the datanodes on rack2.
-       * The third input file is pulled from rack1.
-       */
-      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
-                               2, 0);
-      mr.shutdown();
-      
-      /* Run a job with the (only)tasktracker on rack1.
-       */
-      mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
-
-      /* The job is configured with three maps since there are three 
-       * (non-splittable) files. On rack1, because of the way in which repl
-       * was setup while creating the files, we will have all the three files. 
-       * Thus, a tasktracker will find all inputs in this rack.
-       */
-      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
-                               0, 3);
-      mr.shutdown();
-      
-    } finally {
-      if (dfs != null) { 
-        dfs.shutdown(); 
-      }
-      if (mr != null) { 
-        mr.shutdown();
-      }
+      String[] splitHosts1 = new String[] { allHosts[2] };
+      splits[1].setLocations(splitHosts1);
+
+      String[] splitHosts2 = new String[] { allHosts[3] };
+      splits[4].setLocations(splitHosts2);
+
+      return splits;
     }
   }
-  static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath, 
-                              int numMaps, String jobName) throws IOException {
-    jobConf.setJobName(jobName);
-    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
-    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, inDir);
-    FileOutputFormat.setOutputPath(jobConf, outputPath);
-    jobConf.setMapperClass(IdentityMapper.class);
-    jobConf.setReducerClass(IdentityReducer.class);
-    jobConf.setOutputKeyClass(BytesWritable.class);
-    jobConf.setOutputValueClass(BytesWritable.class);
-    jobConf.setNumMapTasks(numMaps);
-    jobConf.setNumReduceTasks(0);
-    jobConf.setJar("build/test/mapred/testjar/testjob.jar");
-    return JobClient.runJob(jobConf);
+  @SuppressWarnings("deprecation")
+  public void testTaskPlacement() throws IOException {
+    JobConf conf = new JobConf();
+    conf.setNumReduceTasks(0);
+    conf.setJobName("TestTaskPlacement");
+    
+    MyFakeJobInProgress jip = new MyFakeJobInProgress(conf, jobTracker);
+    jip.numMaps = 5;
+    jip.initTasks();
+
+    // Tracker1 should get a rack local
+    TaskTrackerStatus tts = new TaskTrackerStatus(trackers[1], allHosts[1]);
+    jip.obtainNewMapTask(tts, 4, 4);
+    
+    // Tracker0 should get a data local
+    tts = new TaskTrackerStatus(trackers[0], allHosts[0]);
+    jip.obtainNewMapTask(tts, 4, 4);
+
+    // Tracker2 should get a data local
+    tts = new TaskTrackerStatus(trackers[2], allHosts[2]);
+    jip.obtainNewMapTask(tts, 4, 4);
+
+    // Tracker0 should get a data local
+    tts = new TaskTrackerStatus(trackers[0], allHosts[0]);
+    jip.obtainNewMapTask(tts, 4, 4);
+
+    // Tracker1 should not get any locality at all
+    tts = new TaskTrackerStatus(trackers[1], allHosts[1]);
+    jip.obtainNewMapTask(tts, 4, 4);
+
+    
+    Counters counters = jip.getCounters();
+    assertEquals("Number of data local maps", 3,
+        counters.getCounter(JobCounter.DATA_LOCAL_MAPS));
+    
+    assertEquals("Number of Rack-local maps", 1 , 
+        counters.getCounter(JobCounter.RACK_LOCAL_MAPS));
+    
+    assertEquals("Number of Other-local maps", 0, 
+        counters.getCounter(JobCounter.OTHER_LOCAL_MAPS));
+
   }
 }