You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/31 21:29:20 UTC

svn commit: r469642 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Oct 31 12:29:16 2006
New Revision: 469642

URL: http://svn.apache.org/viewvc?view=rev&rev=469642
Log:
HADOOP-482.  Fix unit tests to work when a cluster is running on the same machine.  Contributed by Wendy.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/test/hadoop-site.xml
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 31 12:29:16 2006
@@ -91,6 +91,9 @@
     extend the cache to permit symbolic links to cached items, rather
     than local file copies.  (Mahadev Konar via cutting)
 
+25. HADOOP-482.  Fix unit tests to work when a cluster is running on
+    the same machine, removing port conflicts.  (Wendy Chien via cutting)
+
 
 Release 0.7.2 - 2006-10-18
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Tue Oct 31 12:29:16 2006
@@ -29,7 +29,6 @@
   <property name="build.examples" value="${build.dir}/examples"/>
   <property name="build.libhdfs" value="${build.dir}/libhdfs"/>
   <property name="build.docs" value="${build.dir}/docs"/>
-  <property name="build.minimr" value="${build.dir}/minimr"/>
   <property name="build.javadoc" value="${build.docs}/api"/>
   <property name="build.encoding" value="ISO-8859-1"/>
 
@@ -74,7 +73,6 @@
   <!-- the unit test classpath: uses test.src.dir for configuration -->
   <path id="test.classpath">
     <pathelement location="${test.build.classes}" />
-    <pathelement location="${build.minimr}" />
     <pathelement location="${test.src.dir}"/>
     <pathelement location="${build.dir}"/>
     <pathelement location="${build.examples}"/>
@@ -101,7 +99,6 @@
     <mkdir dir="${build.webapps}/dfs/WEB-INF"/>
     <mkdir dir="${build.webapps}/datanode/WEB-INF"/>
     <mkdir dir="${build.examples}"/>
-    <mkdir dir="${build.minimr}"/>
  
     <mkdir dir="${test.build.dir}"/>
     <mkdir dir="${test.build.classes}"/>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 31 12:29:16 2006
@@ -87,11 +87,11 @@
     }
 
     public static void stopTracker() throws IOException {
-      if (tracker == null)
-        throw new IOException("Trying to stop JobTracker that is not running.");
       runTracker = false;
-      tracker.close();
-      tracker = null;
+      if (tracker != null) {
+        tracker.close();
+        tracker = null;
+      }
     }
     
     public long getProtocolVersion(String protocol, long clientVersion) {

Modified: lucene/hadoop/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/hadoop-site.xml?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/hadoop-site.xml (original)
+++ lucene/hadoop/trunk/src/test/hadoop-site.xml Tue Oct 31 12:29:16 2006
@@ -7,14 +7,12 @@
 
 <configuration>
 
-<property>
-  <name>mapred.local.dir</name>
-  <value>build/test/mapred/local</value>
-</property>
 
 <property>
-  <name>mapred.system.dir</name>
-  <value>build/test/mapred/system</value>
+  <name>hadoop.tmp.dir</name>
+  <value>build/test</value>
+  <description>A base for other temporary directories.</description>
 </property>
+
 
 </configuration>

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue Oct 31 12:29:16 2006
@@ -34,6 +34,12 @@
   private Thread dataNodeThread;
   private NameNodeRunner nameNode;
   private DataNodeRunner dataNode;
+  private int maxRetries = 10;
+  private int MAX_RETRIES  = 10;
+  private int MAX_RETRIES_PER_PORT = 10;
+
+  private int nameNodePort = 0;
+  private int nameNodeInfoPort = 0;
 
   /**
    * An inner class that runs a name node.
@@ -111,17 +117,22 @@
       }
     }
   }
-  
+
   /**
-   * Create the config and start up the servers.
+   * Create the config and start up the servers.  If either the rpc or info port is already 
+   * in use, we will try new ports.
+   * @param namenodePort suggestion for which rpc port to use.  caller should use 
+   *                     getNameNodePort() to get the actual port used.   
    * @param dataNodeFirst should the datanode be brought up before the namenode?
    */
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         boolean dataNodeFirst) throws IOException {
+
     this.conf = conf;
-    conf.set("fs.default.name", 
-             "localhost:"+ Integer.toString(namenodePort));
+
+    this.nameNodePort = namenodePort;
+    this.nameNodeInfoPort = 50080;   // We just want this port to be different from the default. 
     File base_dir = new File(System.getProperty("test.build.data"),
                              "dfs/");
     conf.set("dfs.name.dir", new File(base_dir, "name").getPath());
@@ -131,27 +142,66 @@
     // this timeout seems to control the minimum time for the test, so
     // decrease it considerably.
     conf.setInt("ipc.client.timeout", 1000);
-    NameNode.format(conf);
-    nameNode = new NameNodeRunner();
-    nameNodeThread = new Thread(nameNode);
-    dataNode = new DataNodeRunner();
-    dataNodeThread = new Thread(dataNode);
-    if (dataNodeFirst) {
-      dataNodeThread.start();      
-      nameNodeThread.start();      
-    } else {
-      nameNodeThread.start();
-      dataNodeThread.start();      
-    }
-    while (!nameNode.isUp()) {
-      try {                                     // let daemons get started
-        System.out.println("waiting for dfs minicluster to start");
-        Thread.sleep(1000);
-      } catch(InterruptedException e) {
+
+    // Loops until we find ports that work or we give up because 
+    // too many tries have failed.
+    boolean foundPorts = false;
+    int portsTried = 0;
+    while ((!foundPorts) && (portsTried < MAX_RETRIES)) {
+      conf.set("fs.default.name", 
+               "localhost:"+ Integer.toString(nameNodePort));
+      conf.set("dfs.info.port", nameNodeInfoPort);
+      
+      NameNode.format(conf);
+      nameNode = new NameNodeRunner();
+      nameNodeThread = new Thread(nameNode);
+      dataNode = new DataNodeRunner();
+      dataNodeThread = new Thread(dataNode);
+      if (dataNodeFirst) {
+        dataNodeThread.start();      
+        nameNodeThread.start();      
+      } else {
+        nameNodeThread.start();
+        dataNodeThread.start();      
       }
+
+      int retry = 0;
+      while (!nameNode.isUp() && (retry < MAX_RETRIES_PER_PORT)) {
+        try {                                     // let daemons get started
+          System.out.println("waiting for dfs minicluster to start");
+          Thread.sleep(1000);
+        } catch(InterruptedException e) {
+        }
+        retry++;
+      }
+      if (retry >= MAX_RETRIES_PER_PORT) {
+        this.nameNodePort += 3;
+        this.nameNodeInfoPort += 7;
+        System.out.println("Failed to start DFS minicluster in " + retry + " attempts.  Trying new ports:");
+        System.out.println("\tNameNode RPC port: " + nameNodePort);
+        System.out.println("\tNameNode info port: " + nameNodeInfoPort);
+
+        nameNode.shutdown();
+        dataNode.shutdown();
+        
+      } else {
+        foundPorts = true;
+      }
+      portsTried++;
+    } 
+    if (portsTried >= MAX_RETRIES) {
+        throw new IOException("Failed to start a DFS minicluster after trying " + portsTried + " ports.");
     }
   }
-  
+
+  /**
+   * Returns the rpc port used by the mini cluster, because the caller supplied port is 
+   * not necessarily the actual port used.
+   */     
+  public int getNameNodePort() {
+    return nameNodePort;
+  }
+    
   /**
    * Shut down the servers.
    */

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Oct 31 12:29:16 2006
@@ -32,7 +32,7 @@
     
     private int jobTrackerPort = 0;
     private int taskTrackerPort = 0;
-    
+    private int jobTrackerInfoPort = 0;
     private int numTaskTrackers;
     
     private List taskTrackerList = new ArrayList();
@@ -40,10 +40,17 @@
     
     private String namenode;
     
+    private int MAX_RETRIES_PER_PORT = 10;
+    private int MAX_RETRIES = 10;
+
     /**
      * An inner class that runs a job tracker.
      */
     class JobTrackerRunner implements Runnable {
+
+        public boolean isUp() {
+            return (JobTracker.getTracker() != null);
+        }
         /**
          * Create the job tracker and run it.
          */
@@ -52,6 +59,7 @@
                 JobConf jc = new JobConf();
                 jc.set("fs.name.node", namenode);
                 jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
+                jc.set("mapred.job.tracker.info.port", jobTrackerInfoPort);
                 // this timeout seems to control the minimum time for the test, so
                 // set it down at 2 seconds.
                 jc.setInt("ipc.client.timeout", 1000);
@@ -194,9 +202,18 @@
         }
       }
     }
-    
+
+    /** 
+     * Get the actual rpc port used.
+     */
+    public int getJobTrackerPort() {
+        return jobTrackerPort;
+    }
+
     /**
-     * Create the config and start up the servers.
+     * Create the config and start up the servers.  The ports supplied by the user are
+     * just used as suggestions.  If those ports are already in use, new ports
+     * are tried.  The caller should call getJobTrackerPort to get the actual rpc port used.
      */
     public MiniMRCluster(int jobTrackerPort,
                          int taskTrackerPort,
@@ -211,39 +228,65 @@
             int numTaskTrackers,
             String namenode,
             boolean taskTrackerFirst, int numDir) throws IOException {
+        
         this.jobTrackerPort = jobTrackerPort;
         this.taskTrackerPort = taskTrackerPort;
+        this.jobTrackerInfoPort = 50030;
         this.numTaskTrackers = numTaskTrackers;
         this.namenode = namenode;
-        
-        File configDir = new File("build", "minimr");
-        configDir.mkdirs();
-        File siteFile = new File(configDir, "hadoop-site.xml");
-        PrintWriter pw = new PrintWriter(siteFile);
-        pw.print("<?xml version=\"1.0\"?>\n"+
-                "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+
-                "<configuration>\n"+
-                " <property>\n"+
-                "   <name>mapred.system.dir</name>\n"+
-                "   <value>build/test/mapred/system</value>\n"+
-                " </property>\n"+
-                "</configuration>\n");
-        pw.close();
-        jobTracker = new JobTrackerRunner();
-        jobTrackerThread = new Thread(jobTracker);
-        if (!taskTrackerFirst) {
-          jobTrackerThread.start();
-        }
-        for (int idx = 0; idx < numTaskTrackers; idx++) {
+
+        // Loop until we find a set of ports that are all unused or until we
+        // give up because it's taken too many tries.
+        boolean foundPorts = false;
+        int portsTried = 0;
+        while ((!foundPorts) && (portsTried < MAX_RETRIES)) {
+          jobTracker = new JobTrackerRunner();
+          jobTrackerThread = new Thread(jobTracker);
+          if (!taskTrackerFirst) {
+            jobTrackerThread.start();
+          }
+          for (int idx = 0; idx < numTaskTrackers; idx++) {
             TaskTrackerRunner taskTracker = new TaskTrackerRunner(numDir);
             Thread taskTrackerThread = new Thread(taskTracker);
             taskTrackerThread.start();
             taskTrackerList.add(taskTracker);
             taskTrackerThreadList.add(taskTrackerThread);
+          }
+          if (taskTrackerFirst) {
+            jobTrackerThread.start();
+          }
+          int retry = 0;
+          while (!jobTracker.isUp() && (retry < MAX_RETRIES_PER_PORT)) {
+            try {                                     // let daemons get started
+              System.err.println("waiting for jobtracker to start");
+              Thread.sleep(1000);
+            } catch(InterruptedException e) {
+            }
+            retry++;
+          }
+          if (retry >= MAX_RETRIES_PER_PORT) {
+              // Try new ports.
+              this.jobTrackerPort += 7;
+              this.jobTrackerInfoPort += 3;
+              this.taskTrackerPort++;
+
+              System.err.println("Failed to start MR minicluster in " + retry + 
+                                 " attempts.  Retrying with new ports:");
+              System.err.println("\tJobTracker RPC port = " + jobTrackerPort);
+              System.err.println("\tJobTracker info port = " + jobTrackerInfoPort);
+              System.err.println("\tTaskTracker RPC port(s) = " + 
+                                 taskTrackerPort + "-" + (taskTrackerPort+numTaskTrackers-1));
+              shutdown();
+              taskTrackerList.clear();
+          } else {
+            foundPorts = true;
+          }
+          portsTried++;
         }
-        if (taskTrackerFirst) {
-          jobTrackerThread.start();
+        if (portsTried >= MAX_RETRIES) {
+            throw new IOException("Failed to start MR minicluster after trying " + portsTried + " ports.");
         }
+        
         waitUntilIdle();
     }
     

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Tue Oct 31 12:29:16 2006
@@ -101,15 +101,14 @@
       FileSystem fileSys = null;
       try {
           final int taskTrackers = 4;
-          final int jobTrackerPort = 50050;
-          final String jobTrackerName = "localhost:" + jobTrackerPort;
+          final int jobTrackerPort = 60050;
           Configuration conf = new Configuration();
           dfs = new MiniDFSCluster(65315, conf, true);
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
           mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
                                  namenode, true, 2);
-
+          final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
           JobConf jobConf = new JobConf();
           boolean result;
           result = launchEmptyJob(namenode, jobTrackerName, jobConf, 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Tue Oct 31 12:29:16 2006
@@ -96,8 +96,8 @@
       FileSystem fileSys = null;
       try {
           final int taskTrackers = 4;
-          final int jobTrackerPort = 50050;
-          final String jobTrackerName = "localhost:" + jobTrackerPort;
+          final int jobTrackerPort = 60050;
+
           Configuration conf = new Configuration();
           dfs = new MiniDFSCluster(65314, conf, true);
           fileSys = dfs.getFileSystem();
@@ -106,6 +106,7 @@
                                  namenode, true, 3);
           JobConf jobConf = new JobConf();
           String result;
+          final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
           result = launchWordCount(namenode, jobTrackerName, jobConf, 
                                    "The quick brown fox\nhas many silly\n" + 
                                    "red fox sox\n",

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Tue Oct 31 12:29:16 2006
@@ -44,9 +44,9 @@
       dfs = new MiniDFSCluster(65314, conf, true);
       fileSys = dfs.getFileSystem();
       namenode = fileSys.getName();
-      mr = new MiniMRCluster(50050, 50060, 2, namenode, true, 4);
+      mr = new MiniMRCluster(60050, 50060, 2, namenode, true, 4);
       // run the wordcount example with caching
-      boolean ret = MRCaching.launchMRCache("localhost:50050",
+      boolean ret = MRCaching.launchMRCache("localhost:"+mr.getJobTrackerPort(),
                                             "/testing/wc/input",
                                             "/testing/wc/output", namenode,
                                             conf,

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Oct 31 12:29:16 2006
@@ -35,12 +35,13 @@
       MiniMRCluster mr = null;
       try {
           mr = new MiniMRCluster(60030, 60040, 2, "local", false, 3);
-          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
+          String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobTrackerName, "local");
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
           JobConf jconf = new JobConf();
           // run the wordcount example with caching
-          boolean ret = MRCaching.launchMRCache("localhost:60030", "/tmp/wc/input",
+          boolean ret = MRCaching.launchMRCache(jobTrackerName, "/tmp/wc/input",
                                                 "/tmp/wc/output", "local", jconf,
                                                 "The quick brown fox\nhas many silly\n"
                                                     + "red fox sox\n");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=469642&r1=469641&r2=469642
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Oct 31 12:29:16 2006
@@ -145,14 +145,15 @@
       FileSystem fileSys = null;
       try {
           final int taskTrackers = 4;
-          final int jobTrackerPort = 50050;
-          final String jobTrackerName = "localhost:" + jobTrackerPort;
+          final int jobTrackerPort = 60050;
+
           Configuration conf = new Configuration();
           dfs = new MiniDFSCluster(65314, conf, true);
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
           mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
                                  namenode, true);
+          final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
                                                jobTrackerName, namenode);
           double error = Math.abs(Math.PI - estimate);