You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/31 21:29:20 UTC
svn commit: r469642 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/
src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/mapred/
Author: cutting
Date: Tue Oct 31 12:29:16 2006
New Revision: 469642
URL: http://svn.apache.org/viewvc?view=rev&rev=469642
Log:
HADOOP-482. Fix unit tests to work when a cluster is running on the same machine. Contributed by Wendy.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/test/hadoop-site.xml
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 31 12:29:16 2006
@@ -91,6 +91,9 @@
extend the cache to permit symbolic links to cached items, rather
than local file copies. (Mahadev Konar via cutting)
+25. HADOOP-482. Fix unit tests to work when a cluster is running on
+ the same machine, removing port conflicts. (Wendy Chien via cutting)
+
Release 0.7.2 - 2006-10-18
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Tue Oct 31 12:29:16 2006
@@ -29,7 +29,6 @@
<property name="build.examples" value="${build.dir}/examples"/>
<property name="build.libhdfs" value="${build.dir}/libhdfs"/>
<property name="build.docs" value="${build.dir}/docs"/>
- <property name="build.minimr" value="${build.dir}/minimr"/>
<property name="build.javadoc" value="${build.docs}/api"/>
<property name="build.encoding" value="ISO-8859-1"/>
@@ -74,7 +73,6 @@
<!-- the unit test classpath: uses test.src.dir for configuration -->
<path id="test.classpath">
<pathelement location="${test.build.classes}" />
- <pathelement location="${build.minimr}" />
<pathelement location="${test.src.dir}"/>
<pathelement location="${build.dir}"/>
<pathelement location="${build.examples}"/>
@@ -101,7 +99,6 @@
<mkdir dir="${build.webapps}/dfs/WEB-INF"/>
<mkdir dir="${build.webapps}/datanode/WEB-INF"/>
<mkdir dir="${build.examples}"/>
- <mkdir dir="${build.minimr}"/>
<mkdir dir="${test.build.dir}"/>
<mkdir dir="${test.build.classes}"/>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 31 12:29:16 2006
@@ -87,11 +87,11 @@
}
public static void stopTracker() throws IOException {
- if (tracker == null)
- throw new IOException("Trying to stop JobTracker that is not running.");
runTracker = false;
- tracker.close();
- tracker = null;
+ if (tracker != null) {
+ tracker.close();
+ tracker = null;
+ }
}
public long getProtocolVersion(String protocol, long clientVersion) {
Modified: lucene/hadoop/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/hadoop-site.xml?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/hadoop-site.xml (original)
+++ lucene/hadoop/trunk/src/test/hadoop-site.xml Tue Oct 31 12:29:16 2006
@@ -7,14 +7,12 @@
<configuration>
-<property>
- <name>mapred.local.dir</name>
- <value>build/test/mapred/local</value>
-</property>
<property>
- <name>mapred.system.dir</name>
- <value>build/test/mapred/system</value>
+ <name>hadoop.tmp.dir</name>
+ <value>build/test</value>
+ <description>A base for other temporary directories.</description>
</property>
+
</configuration>
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue Oct 31 12:29:16 2006
@@ -34,6 +34,12 @@
private Thread dataNodeThread;
private NameNodeRunner nameNode;
private DataNodeRunner dataNode;
+ private int maxRetries = 10;
+ private int MAX_RETRIES = 10;
+ private int MAX_RETRIES_PER_PORT = 10;
+
+ private int nameNodePort = 0;
+ private int nameNodeInfoPort = 0;
/**
* An inner class that runs a name node.
@@ -111,17 +117,22 @@
}
}
}
-
+
/**
- * Create the config and start up the servers.
+ * Create the config and start up the servers. If either the rpc or info port is already
+ * in use, we will try new ports.
+ * @param namenodePort suggestion for which rpc port to use. caller should use
+ * getNameNodePort() to get the actual port used.
* @param dataNodeFirst should the datanode be brought up before the namenode?
*/
public MiniDFSCluster(int namenodePort,
Configuration conf,
boolean dataNodeFirst) throws IOException {
+
this.conf = conf;
- conf.set("fs.default.name",
- "localhost:"+ Integer.toString(namenodePort));
+
+ this.nameNodePort = namenodePort;
+ this.nameNodeInfoPort = 50080; // We just want this port to be different from the default.
File base_dir = new File(System.getProperty("test.build.data"),
"dfs/");
conf.set("dfs.name.dir", new File(base_dir, "name").getPath());
@@ -131,27 +142,66 @@
// this timeout seems to control the minimum time for the test, so
// decrease it considerably.
conf.setInt("ipc.client.timeout", 1000);
- NameNode.format(conf);
- nameNode = new NameNodeRunner();
- nameNodeThread = new Thread(nameNode);
- dataNode = new DataNodeRunner();
- dataNodeThread = new Thread(dataNode);
- if (dataNodeFirst) {
- dataNodeThread.start();
- nameNodeThread.start();
- } else {
- nameNodeThread.start();
- dataNodeThread.start();
- }
- while (!nameNode.isUp()) {
- try { // let daemons get started
- System.out.println("waiting for dfs minicluster to start");
- Thread.sleep(1000);
- } catch(InterruptedException e) {
+
+ // Loops until we find ports that work or we give up because
+ // too many tries have failed.
+ boolean foundPorts = false;
+ int portsTried = 0;
+ while ((!foundPorts) && (portsTried < MAX_RETRIES)) {
+ conf.set("fs.default.name",
+ "localhost:"+ Integer.toString(nameNodePort));
+ conf.set("dfs.info.port", nameNodeInfoPort);
+
+ NameNode.format(conf);
+ nameNode = new NameNodeRunner();
+ nameNodeThread = new Thread(nameNode);
+ dataNode = new DataNodeRunner();
+ dataNodeThread = new Thread(dataNode);
+ if (dataNodeFirst) {
+ dataNodeThread.start();
+ nameNodeThread.start();
+ } else {
+ nameNodeThread.start();
+ dataNodeThread.start();
}
+
+ int retry = 0;
+ while (!nameNode.isUp() && (retry < MAX_RETRIES_PER_PORT)) {
+ try { // let daemons get started
+ System.out.println("waiting for dfs minicluster to start");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ }
+ retry++;
+ }
+ if (retry >= MAX_RETRIES_PER_PORT) {
+ this.nameNodePort += 3;
+ this.nameNodeInfoPort += 7;
+ System.out.println("Failed to start DFS minicluster in " + retry + " attempts. Trying new ports:");
+ System.out.println("\tNameNode RPC port: " + nameNodePort);
+ System.out.println("\tNameNode info port: " + nameNodeInfoPort);
+
+ nameNode.shutdown();
+ dataNode.shutdown();
+
+ } else {
+ foundPorts = true;
+ }
+ portsTried++;
+ }
+ if (portsTried >= MAX_RETRIES) {
+ throw new IOException("Failed to start a DFS minicluster after trying " + portsTried + " ports.");
}
}
-
+
+ /**
+ * Returns the rpc port used by the mini cluster, because the caller supplied port is
+ * not necessarily the actual port used.
+ */
+ public int getNameNodePort() {
+ return nameNodePort;
+ }
+
/**
* Shut down the servers.
*/
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Oct 31 12:29:16 2006
@@ -32,7 +32,7 @@
private int jobTrackerPort = 0;
private int taskTrackerPort = 0;
-
+ private int jobTrackerInfoPort = 0;
private int numTaskTrackers;
private List taskTrackerList = new ArrayList();
@@ -40,10 +40,17 @@
private String namenode;
+ private int MAX_RETRIES_PER_PORT = 10;
+ private int MAX_RETRIES = 10;
+
/**
* An inner class that runs a job tracker.
*/
class JobTrackerRunner implements Runnable {
+
+ public boolean isUp() {
+ return (JobTracker.getTracker() != null);
+ }
/**
* Create the job tracker and run it.
*/
@@ -52,6 +59,7 @@
JobConf jc = new JobConf();
jc.set("fs.name.node", namenode);
jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
+ jc.set("mapred.job.tracker.info.port", jobTrackerInfoPort);
// this timeout seems to control the minimum time for the test, so
// set it down at 2 seconds.
jc.setInt("ipc.client.timeout", 1000);
@@ -194,9 +202,18 @@
}
}
}
-
+
+ /**
+ * Get the actual rpc port used.
+ */
+ public int getJobTrackerPort() {
+ return jobTrackerPort;
+ }
+
/**
- * Create the config and start up the servers.
+ * Create the config and start up the servers. The ports supplied by the user are
+ * just used as suggestions. If those ports are already in use, new ports
+ * are tried. The caller should call getJobTrackerPort to get the actual rpc port used.
*/
public MiniMRCluster(int jobTrackerPort,
int taskTrackerPort,
@@ -211,39 +228,65 @@
int numTaskTrackers,
String namenode,
boolean taskTrackerFirst, int numDir) throws IOException {
+
this.jobTrackerPort = jobTrackerPort;
this.taskTrackerPort = taskTrackerPort;
+ this.jobTrackerInfoPort = 50030;
this.numTaskTrackers = numTaskTrackers;
this.namenode = namenode;
-
- File configDir = new File("build", "minimr");
- configDir.mkdirs();
- File siteFile = new File(configDir, "hadoop-site.xml");
- PrintWriter pw = new PrintWriter(siteFile);
- pw.print("<?xml version=\"1.0\"?>\n"+
- "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+
- "<configuration>\n"+
- " <property>\n"+
- " <name>mapred.system.dir</name>\n"+
- " <value>build/test/mapred/system</value>\n"+
- " </property>\n"+
- "</configuration>\n");
- pw.close();
- jobTracker = new JobTrackerRunner();
- jobTrackerThread = new Thread(jobTracker);
- if (!taskTrackerFirst) {
- jobTrackerThread.start();
- }
- for (int idx = 0; idx < numTaskTrackers; idx++) {
+
+ // Loop until we find a set of ports that are all unused or until we
+ // give up because it's taken too many tries.
+ boolean foundPorts = false;
+ int portsTried = 0;
+ while ((!foundPorts) && (portsTried < MAX_RETRIES)) {
+ jobTracker = new JobTrackerRunner();
+ jobTrackerThread = new Thread(jobTracker);
+ if (!taskTrackerFirst) {
+ jobTrackerThread.start();
+ }
+ for (int idx = 0; idx < numTaskTrackers; idx++) {
TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
Thread taskTrackerThread = new Thread(taskTracker);
taskTrackerThread.start();
taskTrackerList.add(taskTracker);
taskTrackerThreadList.add(taskTrackerThread);
+ }
+ if (taskTrackerFirst) {
+ jobTrackerThread.start();
+ }
+ int retry = 0;
+ while (!jobTracker.isUp() && (retry < MAX_RETRIES_PER_PORT)) {
+ try { // let daemons get started
+ System.err.println("waiting for jobtracker to start");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ }
+ retry++;
+ }
+ if (retry >= MAX_RETRIES_PER_PORT) {
+ // Try new ports.
+ this.jobTrackerPort += 7;
+ this.jobTrackerInfoPort += 3;
+ this.taskTrackerPort++;
+
+ System.err.println("Failed to start MR minicluster in " + retry +
+ " attempts. Retrying with new ports:");
+ System.err.println("\tJobTracker RPC port = " + jobTrackerPort);
+ System.err.println("\tJobTracker info port = " + jobTrackerInfoPort);
+ System.err.println("\tTaskTracker RPC port(s) = " +
+ taskTrackerPort + "-" + (taskTrackerPort+numTaskTrackers-1));
+ shutdown();
+ taskTrackerList.clear();
+ } else {
+ foundPorts = true;
+ }
+ portsTried++;
}
- if (taskTrackerFirst) {
- jobTrackerThread.start();
+ if (portsTried >= MAX_RETRIES) {
+ throw new IOException("Failed to start MR minicluster after trying " + portsTried + " ports.");
}
+
waitUntilIdle();
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Tue Oct 31 12:29:16 2006
@@ -101,15 +101,14 @@
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
- final int jobTrackerPort = 50050;
- final String jobTrackerName = "localhost:" + jobTrackerPort;
+ final int jobTrackerPort = 60050;
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(65315, conf, true);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,
namenode, true, 2);
-
+ final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = new JobConf();
boolean result;
result = launchEmptyJob(namenode, jobTrackerName, jobConf,
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Tue Oct 31 12:29:16 2006
@@ -96,8 +96,8 @@
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
- final int jobTrackerPort = 50050;
- final String jobTrackerName = "localhost:" + jobTrackerPort;
+ final int jobTrackerPort = 60050;
+
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(65314, conf, true);
fileSys = dfs.getFileSystem();
@@ -106,6 +106,7 @@
namenode, true, 3);
JobConf jobConf = new JobConf();
String result;
+ final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
result = launchWordCount(namenode, jobTrackerName, jobConf,
"The quick brown fox\nhas many silly\n" +
"red fox sox\n",
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Tue Oct 31 12:29:16 2006
@@ -44,9 +44,9 @@
dfs = new MiniDFSCluster(65314, conf, true);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
- mr = new MiniMRCluster(50050, 50060, 2, namenode, true, 4);
+ mr = new MiniMRCluster(60050, 50060, 2, namenode, true, 4);
// run the wordcount example with caching
- boolean ret = MRCaching.launchMRCache("localhost:50050",
+ boolean ret = MRCaching.launchMRCache("localhost:"+mr.getJobTrackerPort(),
"/testing/wc/input",
"/testing/wc/output", namenode,
conf,
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Oct 31 12:29:16 2006
@@ -35,12 +35,13 @@
MiniMRCluster mr = null;
try {
mr = new MiniMRCluster(60030, 60040, 2, "local", false, 3);
- double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
+ String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobTrackerName, "local");
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
JobConf jconf = new JobConf();
// run the wordcount example with caching
- boolean ret = MRCaching.launchMRCache("localhost:60030", "/tmp/wc/input",
+ boolean ret = MRCaching.launchMRCache(jobTrackerName, "/tmp/wc/input",
"/tmp/wc/output", "local", jconf,
"The quick brown fox\nhas many silly\n"
+ "red fox sox\n");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Oct 31 12:29:16 2006
@@ -145,14 +145,15 @@
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
- final int jobTrackerPort = 50050;
- final String jobTrackerName = "localhost:" + jobTrackerPort;
+ final int jobTrackerPort = 60050;
+
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(65314, conf, true);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers,
namenode, true);
+ final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES,
jobTrackerName, namenode);
double error = Math.abs(Math.PI - estimate);