You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/05/19 23:08:16 UTC
svn commit: r657979 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: cdouglas
Date: Mon May 19 14:08:15 2008
New Revision: 657979
URL: http://svn.apache.org/viewvc?rev=657979&view=rev
Log:
HADOOP-3296. Fix task cache to work for more than two levels in the cache
hierarchy. This also adds a new counter to track cache hits at levels greater
than two. Contributed by Amar Kamat.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 19 14:08:15 2008
@@ -284,6 +284,10 @@
HADOOP-3409. Namenode should save the root inode into fsimage. (hairong)
+ HADOOP-3296. Fix task cache to work for more than two levels in the cache
+ hierarchy. This also adds a new counter to track cache hits at levels
+ greater than two. (Amar Kamat via cdouglas)
+
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon May 19 14:08:15 2008
@@ -137,6 +137,7 @@
NUM_FAILED_REDUCES,
TOTAL_LAUNCHED_MAPS,
TOTAL_LAUNCHED_REDUCES,
+ OTHER_LOCAL_MAPS,
DATA_LOCAL_MAPS,
RACK_LOCAL_MAPS
}
@@ -267,7 +268,6 @@
Node node = jobtracker.resolveAndAddToTopology(host);
LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
for (int j = 0; j < maxLevel; j++) {
- node = JobTracker.getParentNode(node, j);
List<TaskInProgress> hostMaps = cache.get(node);
if (hostMaps == null) {
hostMaps = new ArrayList<TaskInProgress>();
@@ -282,6 +282,7 @@
if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
hostMaps.add(maps[i]);
}
+ node = node.getParent();
}
}
}
@@ -1090,6 +1091,7 @@
} else {
LOG.info("Choosing cached task at level " + level
+ tip.getTIPId());
+ jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
}
return tip.getIdWithinJob();
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties Mon May 19 14:08:15 2008
@@ -6,6 +6,7 @@
NUM_FAILED_REDUCES.name= Failed reduce tasks
TOTAL_LAUNCHED_MAPS.name= Launched map tasks
TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks
+OTHER_LOCAL_MAPS.name= Other local map tasks
DATA_LOCAL_MAPS.name= Data-local map tasks
RACK_LOCAL_MAPS.name= Rack-local map tasks
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon May 19 14:08:15 2008
@@ -57,6 +57,10 @@
JobConf jc = null;
+ public JobTrackerRunner(JobConf conf) {
+ jc = conf;
+ }
+
public boolean isUp() {
return (tracker != null);
}
@@ -74,7 +78,7 @@
*/
public void run() {
try {
- jc = createJobConf();
+ jc = (jc == null) ? createJobConf() : createJobConf(jc);
jc.set("mapred.local.dir","build/test/mapred/local");
jc.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
@@ -246,7 +250,11 @@
}
public JobConf createJobConf() {
- JobConf result = new JobConf();
+ return createJobConf(new JobConf());
+ }
+
+ public JobConf createJobConf(JobConf conf) {
+ JobConf result = new JobConf(conf);
FileSystem.setDefaultUri(result, namenode);
result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
result.set("mapred.job.tracker.http.address",
@@ -278,6 +286,22 @@
* @param numTaskTrackers no. of tasktrackers in the cluster
* @param namenode the namenode
* @param numDir no. of directories
+ * @param racks Array of racks
+ * @param hosts Array of hosts in the corresponding racks
+ * @param conf Default conf for the jobtracker
+ * @throws IOException
+ */
+ public MiniMRCluster(int numTaskTrackers, String namenode, int numDir,
+ String[] racks, String[] hosts, JobConf conf)
+ throws IOException {
+ this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf);
+ }
+
+ /**
+ * Create the config and the cluster.
+ * @param numTaskTrackers no. of tasktrackers in the cluster
+ * @param namenode the namenode
+ * @param numDir no. of directories
* @throws IOException
*/
public MiniMRCluster(int numTaskTrackers, String namenode, int numDir)
@@ -319,6 +343,14 @@
int numTaskTrackers, String namenode,
int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi
) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode,
+ numDir, racks, hosts, ugi, null);
+ }
+
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode,
+ int numDir, String[] racks, String[] hosts, UnixUserGroupInformation ugi,
+ JobConf conf) throws IOException {
if (racks != null && racks.length < numTaskTrackers) {
LOG.error("Invalid number of racks specified. It should be at least " +
"equal to the number of tasktrackers");
@@ -344,7 +376,7 @@
this.ugi = ugi;
// Create the JobTracker
- jobTracker = new JobTrackerRunner();
+ jobTracker = new JobTrackerRunner(conf);
jobTrackerThread = new Thread(jobTracker);
jobTrackerThread.start();
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=657979&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Mon May 19 14:08:15 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TestRackAwareTaskPlacement;
+
+/**
+ * This test checks whether the task caches are created and used properly.
+ */
+public class TestMultipleLevelCaching extends TestCase {
+ private static final int MAX_LEVEL = 5;
+ final Path inDir = new Path("/cachetesting");
+ final Path outputPath = new Path("/output");
+
+ /**
+ * Returns a string representing a rack with level + 1 nodes in the topology
+ * for the rack.
+ * For id = 2, level = 2 we get /a/b2/c2
+ * id = 1, level = 3 we get /a/b1/c1/d1
+ * NOTE There should always be one shared node i.e /a
+ * @param id Unique Id for the rack
+ * @param level The level in the topology where the separation starts
+ */
+ private static String getRack(int id, int level) {
+ StringBuilder rack = new StringBuilder();
+ char alpha = 'a';
+ int length = level + 1;
+ while (length > level) {
+ rack.append("/");
+ rack.append(alpha);
+ ++alpha;
+ --length;
+ }
+ while (length > 0) {
+ rack.append("/");
+ rack.append(alpha);
+ rack.append(id);
+ ++alpha;
+ --length;
+ }
+ return rack.toString();
+ }
+
+ public void testMultiLevelCaching() throws IOException {
+ for (int i = 1 ; i <= MAX_LEVEL; ++i) {
+ testCachingAtLevel(i);
+ }
+ }
+
+ private void testCachingAtLevel(int level) throws IOException {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ String testName = "TestMultiLevelCaching";
+ try {
+ final int taskTrackers = 1;
+ // generate the racks
+ // use rack1 for data node
+ String rack1 = getRack(0, level);
+ // use rack2 for task tracker
+ String rack2 = getRack(1, level);
+ Configuration conf = new Configuration();
+ // Run a datanode on host1 under /a/b/c/..../d1/e1/f1
+ dfs = new MiniDFSCluster(conf, 1, true, new String[] {rack1},
+ new String[] {"host1.com"});
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf,
+ new Path(inDir + "/file"), (short)1);
+ namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+ (dfs.getFileSystem()).getUri().getPort();
+
+ // Run a job with the (only)tasktracker on host2 under diff topology
+ // e.g /a/b/c/..../d2/e2/f2.
+ JobConf jc = new JobConf();
+ // cache-level = level (unshared levels) + 1(topmost shared node i.e /a)
+ // + 1 (for host)
+ jc.setInt("mapred.task.cache.levels", level + 2);
+ mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2},
+ new String[] {"host2.com"}, jc);
+
+ /* The job is configured with 1 map for one (non-splittable) file.
+ * Since the datanode is running under different subtree, there is no
+ * node-level data locality but there should be topological locality.
+ */
+ TestRackAwareTaskPlacement.launchJobAndTestCounters(
+ testName, mr, fileSys, inDir, outputPath, 1, 1, 0, 0);
+ mr.shutdown();
+ } finally {
+ fileSys.delete(inDir, true);
+ fileSys.delete(outputPath, true);
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=657979&r1=657978&r2=657979&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Mon May 19 14:08:15 2008
@@ -55,11 +55,47 @@
final Path inDir = new Path("/racktesting");
final Path outputPath = new Path("/output");
+ /**
+ * Launches a MR job and tests the job counters against the expected values.
+ * @param testName The name for the job
+ * @param mr The MR cluster
+ * @param fileSys The FileSystem
+ * @param in Input path
+ * @param out Output path
+ * @param numMaps Number of maps
+ * @param otherLocalMaps Expected value of other local maps
+ * @param datalocalMaps Expected value of data(node) local maps
+ * @param racklocalMaps Expected value of rack local maps
+ */
+ static void launchJobAndTestCounters(String jobName, MiniMRCluster mr,
+ FileSystem fileSys, Path in, Path out,
+ int numMaps, int otherLocalMaps,
+ int dataLocalMaps, int rackLocalMaps)
+ throws IOException {
+ JobConf jobConf = mr.createJobConf();
+ if (fileSys.exists(out)) {
+ fileSys.delete(out, true);
+ }
+ RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
+ Counters counters = job.getCounters();
+ assertEquals("Number of local maps",
+ counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
+ assertEquals("Number of Data-local maps",
+ counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS),
+ dataLocalMaps);
+ assertEquals("Number of Rack-local maps",
+ counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS),
+ rackLocalMaps);
+ mr.waitUntilIdle();
+ mr.shutdown();
+ }
+
public void testTaskPlacement() throws IOException {
String namenode = null;
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
+ String testName = "TestForRackAwareness";
try {
final int taskTrackers = 1;
@@ -95,42 +131,29 @@
* datanodes.
*/
mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
- JobConf jobConf = mr.createJobConf();
- if (fileSys.exists(outputPath)) {
- fileSys.delete(outputPath, true);
- }
+
/* The job is configured with three maps since there are three
* (non-splittable) files. On rack2, there are two files and both
* have repl of three. The blocks for those files must therefore be
* present on all the datanodes, in particular, the datanodes on rack2.
* The third input file is pulled from rack1.
*/
- RunningJob job = launchJob(jobConf, 3);
- Counters counters = job.getCounters();
- assertEquals("Number of Data-local maps",
- counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 2);
- assertEquals("Number of Rack-local maps",
- counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 0);
- mr.waitUntilIdle();
+ launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+ 2, 0);
+ mr.shutdown();
/* Run a job with the (only)tasktracker on rack1.
*/
mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
- jobConf = mr.createJobConf();
- fileSys = dfs.getFileSystem();
- if (fileSys.exists(outputPath)) {
- fileSys.delete(outputPath, true);
- }
+
/* The job is configured with three maps since there are three
* (non-splittable) files. On rack1, because of the way in which repl
* was setup while creating the files, we will have all the three files.
* Thus, a tasktracker will find all inputs in this rack.
*/
- job = launchJob(jobConf, 3);
- counters = job.getCounters();
- assertEquals("Number of Rack-local maps",
- counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 3);
- mr.waitUntilIdle();
+ launchJobAndTestCounters(testName, mr, fileSys, inDir, outputPath, 3, 0,
+ 0, 3);
+ mr.shutdown();
} finally {
if (dfs != null) {
@@ -141,7 +164,7 @@
}
}
}
- private void writeFile(NameNode namenode, Configuration conf, Path name,
+ static void writeFile(NameNode namenode, Configuration conf, Path name,
short replication) throws IOException {
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer =
@@ -153,7 +176,7 @@
fileSys.setReplication(name, replication);
waitForReplication(fileSys, namenode, name, replication);
}
- private void waitForReplication(FileSystem fileSys, NameNode namenode,
+ static void waitForReplication(FileSystem fileSys, NameNode namenode,
Path name, short replication) throws IOException {
//wait for the replication to happen
boolean isReplicationDone;
@@ -173,8 +196,10 @@
}
} while(!isReplicationDone);
}
- private RunningJob launchJob(JobConf jobConf, int numMaps) throws IOException {
- jobConf.setJobName("TestForRackAwareness");
+
+ static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
+ int numMaps, String jobName) throws IOException {
+ jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);