You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/07/06 11:14:49 UTC
svn commit: r791418 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Author: ddas
Date: Mon Jul 6 09:14:48 2009
New Revision: 791418
URL: http://svn.apache.org/viewvc?rev=791418&view=rev
Log:
MAPREDUCE-701. Improves the runtime of the TestRackAwareTaskPlacement by making it a unit test. Contributed by Jothi Padmanabhan.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul 6 09:14:48 2009
@@ -72,6 +72,9 @@
MAPREDUCE-692. Make Hudson run Sqoop unit tests.
(Aaron Kimball via tomwhite)
+ MAPREDUCE-701. Improves the runtime of the TestRackAwareTaskPlacement
+ by making it a unit test. (Jothi Padmanabhan via ddas)
+
BUG FIXES
MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
(Aaron Kimball via matei)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jul 6 09:14:48 2009
@@ -98,10 +98,10 @@
int speculativeReduceTasks = 0;
// Limits on concurrent running tasks per-node and cluster-wide
- private int maxMapsPerNode;
- private int maxReducesPerNode;
- private int runningMapLimit;
- private int runningReduceLimit;
+ int maxMapsPerNode;
+ int maxReducesPerNode;
+ int runningMapLimit;
+ int runningReduceLimit;
int mapFailuresPercent = 0;
int reduceFailuresPercent = 0;
@@ -140,7 +140,7 @@
// A list of cleanup tasks for the reduce task attempts, to be launched
List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
- private int maxLevel;
+ int maxLevel;
/**
* A special value indicating that
@@ -434,7 +434,7 @@
}
}
- private Map<Node, List<TaskInProgress>> createCache(
+ Map<Node, List<TaskInProgress>> createCache(
JobClient.RawSplit[] splits, int maxLevel) {
Map<Node, List<TaskInProgress>> cache =
new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Mon Jul 6 09:14:48 2009
@@ -25,7 +25,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.TestRackAwareTaskPlacement;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobCounter;
/**
* This test checks whether the task caches are created and used properly.
@@ -110,7 +114,7 @@
* Since the datanode is running under different subtree, there is no
* node-level data locality but there should be topological locality.
*/
- TestRackAwareTaskPlacement.launchJobAndTestCounters(
+ launchJobAndTestCounters(
testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
mr.shutdown();
} finally {
@@ -121,4 +125,57 @@
}
}
}
+
+
+ /**
+ * Launches a MR job and tests the job counters against the expected values.
+ * @param testName The name for the job
+ * @param mr The MR cluster
+ * @param fileSys The FileSystem
+ * @param in Input path
+ * @param out Output path
+ * @param numMaps Number of maps
+ * @param otherLocalMaps Expected value of other local maps
+ * @param datalocalMaps Expected value of data(node) local maps
+ * @param racklocalMaps Expected value of rack local maps
+ */
+ static void launchJobAndTestCounters(String jobName, MiniMRCluster mr,
+ FileSystem fileSys, Path in, Path out,
+ int numMaps, int otherLocalMaps,
+ int dataLocalMaps, int rackLocalMaps)
+ throws IOException {
+ JobConf jobConf = mr.createJobConf();
+ if (fileSys.exists(out)) {
+ fileSys.delete(out, true);
+ }
+ RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
+ Counters counters = job.getCounters();
+ assertEquals("Number of local maps",
+ counters.getCounter(JobCounter.OTHER_LOCAL_MAPS), otherLocalMaps);
+ assertEquals("Number of Data-local maps",
+ counters.getCounter(JobCounter.DATA_LOCAL_MAPS),
+ dataLocalMaps);
+ assertEquals("Number of Rack-local maps",
+ counters.getCounter(JobCounter.RACK_LOCAL_MAPS),
+ rackLocalMaps);
+ mr.waitUntilIdle();
+ mr.shutdown();
+ }
+
+ static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
+ int numMaps, String jobName) throws IOException {
+ jobConf.setJobName(jobName);
+ jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileInputFormat.setInputPaths(jobConf, inDir);
+ FileOutputFormat.setOutputPath(jobConf, outputPath);
+ jobConf.setMapperClass(IdentityMapper.class);
+ jobConf.setReducerClass(IdentityReducer.class);
+ jobConf.setOutputKeyClass(BytesWritable.class);
+ jobConf.setOutputValueClass(BytesWritable.class);
+ jobConf.setNumMapTasks(numMaps);
+ jobConf.setNumReduceTasks(0);
+ jobConf.setJar("build/test/mapred/testjar/testjob.jar");
+ return JobClient.runJob(jobConf);
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=791418&r1=791417&r2=791418&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Mon Jul 6 09:14:48 2009
@@ -15,167 +15,160 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.mapred;
import java.io.IOException;
+import junit.extensions.TestSetup;
+import junit.framework.Test;
import junit.framework.TestCase;
+import junit.framework.TestSuite;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.StaticMapping;
+/**
+ * A JUnit test to test configured task limits.
+ */
public class TestRackAwareTaskPlacement extends TestCase {
- private static final String rack1[] = new String[] {
- "/r1"
- };
- private static final String hosts1[] = new String[] {
- "host1.rack1.com"
- };
- private static final String rack2[] = new String[] {
- "/r2", "/r2"
- };
- private static final String hosts2[] = new String[] {
- "host1.rack2.com", "host2.rack2.com"
- };
- private static final String hosts3[] = new String[] {
- "host3.rack1.com"
- };
- private static final String hosts4[] = new String[] {
- "host1.rack2.com"
- };
- final Path inDir = new Path("/racktesting");
- final Path outputPath = new Path("/output");
+
+ static String trackers[] = new String[] {"tracker_tracker1.r1.com:1000",
+ "tracker_tracker2.r1.com:1000", "tracker_tracker3.r2.com:1000",
+ "tracker_tracker4.r3.com:1000"};
- /**
- * Launches a MR job and tests the job counters against the expected values.
- * @param testName The name for the job
- * @param mr The MR cluster
- * @param fileSys The FileSystem
- * @param in Input path
- * @param out Output path
- * @param numMaps Number of maps
- * @param otherLocalMaps Expected value of other local maps
- * @param datalocalMaps Expected value of data(node) local maps
- * @param racklocalMaps Expected value of rack local maps
- */
- static void launchJobAndTestCounters(String jobName, MiniMRCluster mr,
- FileSystem fileSys, Path in, Path out,
- int numMaps, int otherLocalMaps,
- int dataLocalMaps, int rackLocalMaps)
- throws IOException {
- JobConf jobConf = mr.createJobConf();
- if (fileSys.exists(out)) {
- fileSys.delete(out, true);
- }
- RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
- Counters counters = job.getCounters();
- assertEquals("Number of local maps",
- counters.getCounter(JobCounter.OTHER_LOCAL_MAPS), otherLocalMaps);
- assertEquals("Number of Data-local maps",
- counters.getCounter(JobCounter.DATA_LOCAL_MAPS),
- dataLocalMaps);
- assertEquals("Number of Rack-local maps",
- counters.getCounter(JobCounter.RACK_LOCAL_MAPS),
- rackLocalMaps);
- mr.waitUntilIdle();
- mr.shutdown();
+ static String[] allHosts =
+ new String[] {"tracker1.r1.com", "tracker2.r1.com", "tracker3.r2.com",
+ "tracker4.r3.com"};
+
+ static String[] allRacks =
+ new String[] { "/r1", "/r1", "/r2", "/r3"};
+
+ static FakeJobTracker jobTracker;
+ static String jtIdentifier = "test";
+ private static int jobCounter;
+
+ public static Test suite() {
+ TestSetup setup =
+ new TestSetup(new TestSuite(TestRackAwareTaskPlacement.class)) {
+ protected void setUp() throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("mapred.job.tracker", "localhost:0");
+ conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ conf.setClass("topology.node.switch.mapping.impl",
+ StaticMapping.class, DNSToSwitchMapping.class);
+ jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
+ // Set up the Topology Information
+ for (int i = 0; i < allHosts.length; i++) {
+ StaticMapping.addNodeToRack(allHosts[i], allRacks[i]);
+ }
+ for (String tracker : trackers) {
+ FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+ }
+ }
+ };
+ return setup;
}
+
+ static class MyFakeJobInProgress extends JobInProgress {
+ static JobID jobid;
+ int numMaps;
+
+ MyFakeJobInProgress(JobConf jc, JobTracker jt) throws IOException {
+ super((jobid = new JobID(jtIdentifier, jobCounter ++)), jc, jt);
+ Path jobFile = new Path("Dummy");
+ this.profile = new JobProfile(jc.getUser(), jobid,
+ jobFile.toString(), null, jc.getJobName(),
+ jc.getQueueName());
+ this.maxMapsPerNode = jc.getMaxMapsPerNode();
+ this.maxReducesPerNode = jc.getMaxReducesPerNode();
+ this.runningMapLimit = jc.getRunningMapLimit();
+ this.runningReduceLimit = jc.getRunningReduceLimit();
+ }
- public void testTaskPlacement() throws IOException {
- String namenode = null;
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fileSys = null;
- String testName = "TestForRackAwareness";
- try {
- final int taskTrackers = 1;
-
- /* Start 3 datanodes, one in rack r1, and two in r2. Create three
- * files (splits).
- * 1) file1, just after starting the datanode on r1, with
- * a repl factor of 1, and,
- * 2) file2 & file3 after starting the other two datanodes, with a repl
- * factor of 3.
- * At the end, file1 will be present on only datanode1, and, file2 and
- * file3, will be present on all datanodes.
- */
- Configuration conf = new Configuration();
- conf.setBoolean("dfs.replication.considerLoad", false);
- dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
- dfs.waitActive();
- fileSys = dfs.getFileSystem();
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
+ @Override
+ public void initTasks() throws IOException {
+ JobClient.RawSplit[] splits = createSplits();
+ numMapTasks = splits.length;
+ createMapTasks(null, splits);
+ nonRunningMapCache = createCache(splits, maxLevel);
+ tasksInited.set(true);
+ this.status.setRunState(JobStatus.RUNNING);
+
+ }
+
+
+ protected JobClient.RawSplit[] createSplits() throws IOException {
+ RawSplit[] splits = new RawSplit[numMaps];
+ // Hand code for now.
+ // M0,2,3 reside in Host1
+ // M1 resides in Host3
+ // M4 resides in Host4
+ String[] splitHosts0 = new String[] { allHosts[0] };
+
+ for (int i = 0; i < numMaps; i++) {
+ splits[i] = new RawSplit();
+ splits[i].setDataLength(0);
}
- UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
- dfs.startDataNodes(conf, 2, true, null, rack2, hosts2, null);
- dfs.waitActive();
- UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
- UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
+ splits[0].setLocations(splitHosts0);
+ splits[2].setLocations(splitHosts0);
+ splits[3].setLocations(splitHosts0);
- namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
- (dfs.getFileSystem()).getUri().getPort();
- /* Run a job with the (only)tasktracker on rack2. The rack location
- * of the tasktracker will determine how many data/rack local maps it
- * runs. The hostname of the tasktracker is set to same as one of the
- * datanodes.
- */
- mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
-
- /* The job is configured with three maps since there are three
- * (non-splittable) files. On rack2, there are two files and both
- * have repl of three. The blocks for those files must therefore be
- * present on all the datanodes, in particular, the datanodes on rack2.
- * The third input file is pulled from rack1.
- */
- launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
- 2, 0);
- mr.shutdown();
-
- /* Run a job with the (only)tasktracker on rack1.
- */
- mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
-
- /* The job is configured with three maps since there are three
- * (non-splittable) files. On rack1, because of the way in which repl
- * was setup while creating the files, we will have all the three files.
- * Thus, a tasktracker will find all inputs in this rack.
- */
- launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
- 0, 3);
- mr.shutdown();
-
- } finally {
- if (dfs != null) {
- dfs.shutdown();
- }
- if (mr != null) {
- mr.shutdown();
- }
+ String[] splitHosts1 = new String[] { allHosts[2] };
+ splits[1].setLocations(splitHosts1);
+
+ String[] splitHosts2 = new String[] { allHosts[3] };
+ splits[4].setLocations(splitHosts2);
+
+ return splits;
}
}
- static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
- int numMaps, String jobName) throws IOException {
- jobConf.setJobName(jobName);
- jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
- FileInputFormat.setInputPaths(jobConf, inDir);
- FileOutputFormat.setOutputPath(jobConf, outputPath);
- jobConf.setMapperClass(IdentityMapper.class);
- jobConf.setReducerClass(IdentityReducer.class);
- jobConf.setOutputKeyClass(BytesWritable.class);
- jobConf.setOutputValueClass(BytesWritable.class);
- jobConf.setNumMapTasks(numMaps);
- jobConf.setNumReduceTasks(0);
- jobConf.setJar("build/test/mapred/testjar/testjob.jar");
- return JobClient.runJob(jobConf);
+ @SuppressWarnings("deprecation")
+ public void testTaskPlacement() throws IOException {
+ JobConf conf = new JobConf();
+ conf.setNumReduceTasks(0);
+ conf.setJobName("TestTaskPlacement");
+
+ MyFakeJobInProgress jip = new MyFakeJobInProgress(conf, jobTracker);
+ jip.numMaps = 5;
+ jip.initTasks();
+
+ // Tracker1 should get a rack local
+ TaskTrackerStatus tts = new TaskTrackerStatus(trackers[1], allHosts[1]);
+ jip.obtainNewMapTask(tts, 4, 4);
+
+ // Tracker0 should get a data local
+ tts = new TaskTrackerStatus(trackers[0], allHosts[0]);
+ jip.obtainNewMapTask(tts, 4, 4);
+
+ // Tracker2 should get a data local
+ tts = new TaskTrackerStatus(trackers[2], allHosts[2]);
+ jip.obtainNewMapTask(tts, 4, 4);
+
+ // Tracker0 should get a data local
+ tts = new TaskTrackerStatus(trackers[0], allHosts[0]);
+ jip.obtainNewMapTask(tts, 4, 4);
+
+ // Tracker1 should not get any locality at all
+ tts = new TaskTrackerStatus(trackers[1], allHosts[1]);
+ jip.obtainNewMapTask(tts, 4, 4);
+
+
+ Counters counters = jip.getCounters();
+ assertEquals("Number of data local maps", 3,
+ counters.getCounter(JobCounter.DATA_LOCAL_MAPS));
+
+ assertEquals("Number of Rack-local maps", 1 ,
+ counters.getCounter(JobCounter.RACK_LOCAL_MAPS));
+
+ assertEquals("Number of Other-local maps", 0,
+ counters.getCounter(JobCounter.OTHER_LOCAL_MAPS));
+
}
}