You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/05/19 23:08:16 UTC

svn commit: r657979 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cdouglas
Date: Mon May 19 14:08:15 2008
New Revision: 657979

URL: http://svn.apache.org/viewvc?rev=657979&view=rev
Log:
HADOOP-3296. Fix task cache to work for more than two levels in the cache
hierarchy. This also adds a new counter to track cache hits at levels greater
than two. Contributed by Amar Kamat.


Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 19 14:08:15 2008
@@ -284,6 +284,10 @@
 
     HADOOP-3409. Namenode should save the root inode into fsimage. (hairong)
 
+    HADOOP-3296. Fix task cache to work for more than two levels in the cache
+    hierarchy. This also adds a new counter to track cache hits at levels
+    greater than two. (Amar Kamat via cdouglas)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon May 19 14:08:15 2008
@@ -137,6 +137,7 @@
     NUM_FAILED_REDUCES,
     TOTAL_LAUNCHED_MAPS,
     TOTAL_LAUNCHED_REDUCES,
+    OTHER_LOCAL_MAPS,
     DATA_LOCAL_MAPS,
     RACK_LOCAL_MAPS
   }
@@ -267,7 +268,6 @@
         Node node = jobtracker.resolveAndAddToTopology(host);
         LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
         for (int j = 0; j < maxLevel; j++) {
-          node = JobTracker.getParentNode(node, j);
           List<TaskInProgress> hostMaps = cache.get(node);
           if (hostMaps == null) {
             hostMaps = new ArrayList<TaskInProgress>();
@@ -282,6 +282,7 @@
           if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
             hostMaps.add(maps[i]);
           }
+          node = node.getParent();
         }
       }
     }
@@ -1090,6 +1091,7 @@
             } else {
               LOG.info("Choosing cached task at level " + level 
                        + tip.getTIPId());
+              jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
             }
 
             return tip.getIdWithinJob();

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties Mon May 19 14:08:15 2008
@@ -6,6 +6,7 @@
 NUM_FAILED_REDUCES.name=       Failed reduce tasks
 TOTAL_LAUNCHED_MAPS.name=      Launched map tasks
 TOTAL_LAUNCHED_REDUCES.name=   Launched reduce tasks
+OTHER_LOCAL_MAPS.name=         Other local map tasks
 DATA_LOCAL_MAPS.name=          Data-local map tasks
 RACK_LOCAL_MAPS.name=          Rack-local map tasks
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon May 19 14:08:15 2008
@@ -57,6 +57,10 @@
     
     JobConf jc = null;
         
+    public JobTrackerRunner(JobConf conf) {
+      jc = conf;
+    }
+
     public boolean isUp() {
       return (tracker != null);
     }
@@ -74,7 +78,7 @@
      */
     public void run() {
       try {
-        jc = createJobConf();
+        jc = (jc == null) ? createJobConf() : createJobConf(jc);
         jc.set("mapred.local.dir","build/test/mapred/local");
         jc.setClass("topology.node.switch.mapping.impl", 
             StaticMapping.class, DNSToSwitchMapping.class);
@@ -246,7 +250,11 @@
   }
 
   public JobConf createJobConf() {
-    JobConf result = new JobConf();
+    return createJobConf(new JobConf());
+  }
+
+  public JobConf createJobConf(JobConf conf) {
+    JobConf result = new JobConf(conf);
     FileSystem.setDefaultUri(result, namenode);
     result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
     result.set("mapred.job.tracker.http.address", 
@@ -278,6 +286,22 @@
    * @param numTaskTrackers no. of tasktrackers in the cluster
    * @param namenode the namenode
    * @param numDir no. of directories
+   * @param racks Array of racks
+   * @param hosts Array of hosts in the corresponding racks
+   * @param conf Default conf for the jobtracker
+   * @throws IOException
+   */
+  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, 
+                       String[] racks, String[] hosts, JobConf conf) 
+  throws IOException {
+    this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
+  }
+
+  /**
+   * Create the config and the cluster.
+   * @param numTaskTrackers no. of tasktrackers in the cluster
+   * @param namenode the namenode
+   * @param numDir no. of directories
    * @throws IOException
    */
   public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) 
@@ -319,6 +343,14 @@
       int numTaskTrackers, String namenode, 
       int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi
       ) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
+         numDir, racks, hosts, ugi, null);
+  }
+
+  public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+      int numTaskTrackers, String namenode, 
+      int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
+      JobConf conf) throws IOException {
     if (racks != null && racks.length < numTaskTrackers) {
       LOG.error("Invalid number of racks specified. It should be at least " +
           "equal to the number of tasktrackers");
@@ -344,7 +376,7 @@
     this.ugi = ugi;
 
     // Create the JobTracker
-    jobTracker = new JobTrackerRunner();
+    jobTracker = new JobTrackerRunner(conf);
     jobTrackerThread = new Thread(jobTracker);
         
     jobTrackerThread.start();

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=657979&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Mon May 19 14:08:15 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TestRackAwareTaskPlacement;
+
+/**
+ * This test checks whether the task caches are created and used properly.
+ */
+public class TestMultipleLevelCaching extends TestCase {
+  private static final int MAX_LEVEL = 5;
+  final Path inDir = new Path("/cachetesting");
+  final Path outputPath = new Path("/output");
+
+  /**
+   * Returns a string representing a rack with level + 1 nodes in the topology
+   * for the rack.
+   * For id = 2, level = 2 we get /a/b2/c2
+   *     id = 1, level = 3 we get /a/b1/c1/d1
+   * NOTE There should always be one shared node i.e /a 
+   * @param id Unique Id for the rack
+   * @param level The level in the topology where the separation starts
+   */
+  private static String getRack(int id, int level) {
+    StringBuilder rack = new StringBuilder();
+    char alpha = 'a';
+    int length = level + 1;
+    while (length > level) {
+      rack.append("/");
+      rack.append(alpha);
+      ++alpha;
+      --length;
+    }
+    while (length > 0) {
+      rack.append("/");
+      rack.append(alpha);
+      rack.append(id);
+      ++alpha;
+      --length;
+    }
+    return rack.toString();
+  }
+
+  public void testMultiLevelCaching() throws IOException {
+    for (int i = 1 ; i <= MAX_LEVEL; ++i) {
+      testCachingAtLevel(i);
+    }
+  }
+
+  private void testCachingAtLevel(int level) throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    String testName = "TestMultiLevelCaching";
+    try {
+      final int taskTrackers = 1;
+      // generate the racks
+      // use rack1 for data node
+      String rack1 = getRack(0, level);
+      // use rack2 for task tracker
+      String rack2 = getRack(1, level);
+      Configuration conf = new Configuration();
+      // Run a datanode on host1 under /a/b/c/..../d1/e1/f1
+      dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1}, 
+                               new String[] {"host1.com"});
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
+    		                               new Path(inDir + "/file"), (short)1);
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
+                 (dfs.getFileSystem()).getUri().getPort();
+
+      // Run a job with the (only)tasktracker on host2 under diff topology
+      // e.g /a/b/c/..../d2/e2/f2. 
+      JobConf jc = new JobConf();
+      // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) 
+      //               + 1 (for host)
+      jc.setInt("mapred.task.cache.levels", level + 2);
+      mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, 
+    		                 new String[] {"host2.com"}, jc);
+
+      /* The job is configured with 1 map for one (non-splittable) file. 
+       * Since the datanode is running under different subtree, there is no
+       * node-level data locality but there should be topological locality.
+       */
+      TestRackAwareTaskPlacement.launchJobAndTestCounters(
+    		  testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
+      mr.shutdown();
+    } finally {
+      fileSys.delete(inDir, true);
+      fileSys.delete(outputPath, true);
+      if (dfs != null) { 
+        dfs.shutdown(); 
+      }
+    }
+  }
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Mon May 19 14:08:15 2008
@@ -55,11 +55,47 @@
   final Path inDir = new Path("/racktesting");
   final Path outputPath = new Path("/output");
   
+  /**
+   * Launches a MR job and tests the job counters against the expected values.
+   * @param testName The name for the job
+   * @param mr The MR cluster
+   * @param fileSys The FileSystem
+   * @param in Input path
+   * @param out Output path
+   * @param numMaps Number of maps
+   * @param otherLocalMaps Expected value of other local maps
+   * @param datalocalMaps Expected value of data(node) local maps
+   * @param racklocalMaps Expected value of rack local maps
+   */
+  static void launchJobAndTestCounters(String jobName, MiniMRCluster mr, 
+                                       FileSystem fileSys, Path in, Path out,
+                                       int numMaps, int otherLocalMaps,
+                                       int dataLocalMaps, int rackLocalMaps) 
+  throws IOException {
+    JobConf jobConf = mr.createJobConf();
+    if (fileSys.exists(out)) {
+        fileSys.delete(out, true);
+    }
+    RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
+    Counters counters = job.getCounters();
+    assertEquals("Number of local maps", 
+            counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
+    assertEquals("Number of Data-local maps", 
+            counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 
+                                dataLocalMaps);
+    assertEquals("Number of Rack-local maps", 
+            counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 
+                                rackLocalMaps);
+    mr.waitUntilIdle();
+    mr.shutdown();
+  }
+
   public void testTaskPlacement() throws IOException {
     String namenode = null;
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     FileSystem fileSys = null;
+    String testName = "TestForRackAwareness";
     try {
       final int taskTrackers = 1;
 
@@ -95,42 +131,29 @@
        * datanodes.
        */
       mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
-      JobConf jobConf = mr.createJobConf();
-      if (fileSys.exists(outputPath)) {
-        fileSys.delete(outputPath, true);
-      }
+
       /* The job is configured with three maps since there are three 
        * (non-splittable) files. On rack2, there are two files and both
        * have repl of three. The blocks for those files must therefore be
        * present on all the datanodes, in particular, the datanodes on rack2.
        * The third input file is pulled from rack1.
        */
-      RunningJob job = launchJob(jobConf, 3);
-      Counters counters = job.getCounters();
-      assertEquals("Number of Data-local maps", 
-          counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 2);
-      assertEquals("Number of Rack-local maps", 
-          counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 0);
-      mr.waitUntilIdle();
+      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+                               2, 0);
+      mr.shutdown();
       
       /* Run a job with the (only)tasktracker on rack1.
        */
       mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
-      jobConf = mr.createJobConf();
-      fileSys = dfs.getFileSystem();
-      if (fileSys.exists(outputPath)) {
-        fileSys.delete(outputPath, true);
-      }
+
       /* The job is configured with three maps since there are three 
        * (non-splittable) files. On rack1, because of the way in which repl
        * was setup while creating the files, we will have all the three files. 
        * Thus, a tasktracker will find all inputs in this rack.
        */
-      job = launchJob(jobConf, 3);
-      counters = job.getCounters();
-      assertEquals("Number of Rack-local maps",
-          counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 3);
-      mr.waitUntilIdle();
+      launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+                               0, 3);
+      mr.shutdown();
       
     } finally {
       if (dfs != null) { 
@@ -141,7 +164,7 @@
       }
     }
   }
-  private void writeFile(NameNode namenode, Configuration conf, Path name, 
+  static void writeFile(NameNode namenode, Configuration conf, Path name, 
       short replication) throws IOException {
     FileSystem fileSys = FileSystem.get(conf);
     SequenceFile.Writer writer = 
@@ -153,7 +176,7 @@
     fileSys.setReplication(name, replication);
     waitForReplication(fileSys, namenode, name, replication);
   }
-  private void waitForReplication(FileSystem fileSys, NameNode namenode, 
+  static void waitForReplication(FileSystem fileSys, NameNode namenode, 
       Path name, short replication) throws IOException {
     //wait for the replication to happen
     boolean isReplicationDone;
@@ -173,8 +196,10 @@
       }
     } while(!isReplicationDone);
   }
-  private RunningJob launchJob(JobConf jobConf, int numMaps) throws IOException {
-    jobConf.setJobName("TestForRackAwareness");
+
+  static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath, 
+                              int numMaps, String jobName) throws IOException {
+    jobConf.setJobName(jobName);
     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
     FileInputFormat.setInputPaths(jobConf, inDir);