You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/18 11:06:38 UTC

svn commit: r423017 - in /lucene/hadoop/trunk/src: java/org/apache/hadoop/dfs/ java/org/apache/hadoop/ipc/ java/org/apache/hadoop/mapred/ test/org/apache/hadoop/dfs/ test/org/apache/hadoop/fs/ test/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Jul 18 02:06:36 2006
New Revision: 423017

URL: http://svn.apache.org/viewvc?rev=423017&view=rev
Log:
HADOOP-364.  Fix some problems introduced by HADOOP-252.  In particular, fix things when RPC clients start before daemons, plus other improvements to RPC versioning.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Jul 18 02:06:36 2006
@@ -118,10 +118,11 @@
       // get storage info and lock the data dir
       storage = new DataStorage( datadir );
       // connect to name node
-      this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class,
-                                                      DatanodeProtocol.versionID,
-                                                      nameNodeAddr, 
-                                                      conf);
+      this.namenode = (DatanodeProtocol) 
+          RPC.waitForProxy(DatanodeProtocol.class,
+                           DatanodeProtocol.versionID,
+                           nameNodeAddr, 
+                           conf);
       // find free port
       ServerSocket ss = null;
       int tmpPort = conf.getInt("dfs.datanode.port", 50010);
@@ -170,20 +171,7 @@
      * @throws IOException
      */
     private void register() throws IOException {
-      while (shouldRun) {
-        try {
-          dnRegistration = namenode.register( dnRegistration );
-          break;
-        } catch( ConnectException se ) {  // namenode has not been started
-          LOG.info("Namenode not available yet, Zzzzz...");
-        } catch( SocketTimeoutException te ) {  // namenode is busy
-          LOG.info("Problem connecting to Namenode: " + 
-                   StringUtils.stringifyException(te));
-        }
-        try {
-          Thread.sleep(10 * 1000);
-        } catch (InterruptedException ie) {}
-      }
+      dnRegistration = namenode.register( dnRegistration );
       if( storage.getStorageID().equals("") ) {
         storage.setStorageID( dnRegistration.getStorageID());
         storage.write();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Tue Jul 18 02:06:36 2006
@@ -22,12 +22,15 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.io.*;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.*;
 
 /** A simple RPC mechanism.
@@ -163,22 +166,89 @@
     }
   }
 
+  /**
+   * A version mismatch for the RPC protocol.
+   * @author Owen O'Malley
+   */
+  public static class VersionMismatch extends IOException {
+    private String interfaceName;
+    private long clientVersion;
+    private long serverVersion;
+    
+    /**
+     * Create a version mismatch exception
+     * @param interfaceName the name of the protocol mismatch
+     * @param clientVersion the client's version of the protocol
+     * @param serverVersion the server's version of the protocol
+     */
+    public VersionMismatch(String interfaceName, long clientVersion,
+                           long serverVersion) {
+      super("Protocol " + interfaceName + " version mismatch. (client = " +
+            clientVersion + ", server = " + serverVersion + ")");
+      this.interfaceName = interfaceName;
+      this.clientVersion = clientVersion;
+      this.serverVersion = serverVersion;
+    }
+    
+    /**
+     * Get the interface name
+     * @return the java class name 
+     *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
+     */
+    public String getInterfaceName() {
+      return interfaceName;
+    }
+    
+    /**
+     * Get the client's prefered version
+     */
+    public long getClientVersion() {
+      return clientVersion;
+    }
+    
+    /**
+     * Get the server's agreed to version.
+     */
+    public long getServerVersion() {
+      return serverVersion;
+    }
+  }
+  
+  public static VersionedProtocol waitForProxy(Class protocol,
+                                               long clientVersion,
+                                               InetSocketAddress addr,
+                                               Configuration conf
+                                               ) throws IOException {
+    while (true) {
+      try {
+        return getProxy(protocol, clientVersion, addr, conf);
+      } catch( ConnectException se ) {  // namenode has not been started
+        LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+      } catch( SocketTimeoutException te ) {  // namenode is busy
+        LOG.info("Problem connecting to server: " + addr);
+      }
+      try {
+        Thread.sleep(10*1000);
+      } catch (InterruptedException ie) {
+        // IGNORE
+      }
+    }
+  }
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
   public static VersionedProtocol getProxy(Class protocol, long clientVersion,
-      InetSocketAddress addr, Configuration conf)
-  throws RemoteException {
+                                           InetSocketAddress addr, Configuration conf) throws IOException {
     VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
                                   protocol.getClassLoader(),
                                   new Class[] { protocol },
                                   new Invoker(addr, conf));
-    long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
+    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
+                                                  clientVersion);
     if (serverVersion == clientVersion) {
       return proxy;
     } else {
-      throw new RemoteException(protocol.getName(),
-          "RPC Server and Client Versions Mismatched. SID:"+serverVersion+
-          " CID:"+clientVersion);
+      throw new VersionMismatch(protocol.getName(), clientVersion, 
+                                serverVersion);
     }
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java Tue Jul 18 02:06:36 2006
@@ -16,7 +16,7 @@
 
 package org.apache.hadoop.ipc;
 
-import org.apache.hadoop.io.UTF8;
+import java.io.IOException;
 
 /**
  * Superclass of all protocols that use Hadoop RPC.
@@ -25,8 +25,13 @@
  * @author milindb
  */
 public interface VersionedProtocol {
+  
   /**
-   * Return protocol version corresponding to protocol interface
+   * Return protocol version corresponding to protocol interface.
+   * @param protocol The classname of the protocol interface
+   * @param clientVersion The version of the protocol that the client speaks
+   * @return the version that the server will speak
    */
-  public long getProtocolVersion(String protocol, long clientVersion);
+  public long getProtocolVersion(String protocol, 
+                                 long clientVersion) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jul 18 02:06:36 2006
@@ -61,6 +61,9 @@
     boolean shuttingDown = false;
     
     TreeMap tasks = null;
+    /**
+     * Map from taskId -> TaskInProgress.
+     */
     TreeMap runningTasks = null;
     int mapTotal = 0;
     int reduceTotal = 0;
@@ -163,8 +166,10 @@
         this.mapOutputFile.cleanupStorage();
         this.justStarted = true;
 
-        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class,
-            InterTrackerProtocol.versionID, jobTrackAddr, this.fConf);
+        this.jobClient = (InterTrackerProtocol) 
+                          RPC.waitForProxy(InterTrackerProtocol.class,
+                                           InterTrackerProtocol.versionID, 
+                                           jobTrackAddr, this.fConf);
         
         this.running = true;
     }
@@ -1139,7 +1144,8 @@
           JobConf conf=new JobConf();
           new TaskTracker(conf).run();
         } catch (IOException e) {
-            LOG.warn( "Can not start task tracker because "+e.getMessage());
+            LOG.warn( "Can not start task tracker because "+
+                      StringUtils.stringifyException(e));
             System.exit(-1);
         }
     }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue Jul 18 02:06:36 2006
@@ -24,6 +24,18 @@
   class NameNodeRunner implements Runnable {
     private NameNode node;
     
+    public boolean isUp() {
+      if (node == null) {
+        return false;
+      }
+      try {
+        long[] sizes = node.getStats();
+        return sizes[0] != 0;
+      } catch (IOException ie) {
+        return false;
+      }
+    }
+    
     /**
      * Create the name node and run it.
      */
@@ -82,8 +94,11 @@
   
   /**
    * Create the config and start up the servers.
+   * @param dataNodeFirst should the datanode be brought up before the namenode?
    */
-  public MiniDFSCluster(int namenodePort, Configuration conf) throws IOException {
+  public MiniDFSCluster(int namenodePort, 
+                        Configuration conf,
+                        boolean dataNodeFirst) throws IOException {
     this.conf = conf;
     conf.set("fs.default.name", 
              "localhost:"+ Integer.toString(namenodePort));
@@ -98,17 +113,21 @@
     NameNode.format(conf);
     nameNode = new NameNodeRunner();
     nameNodeThread = new Thread(nameNode);
-    nameNodeThread.start();
-    try {                                     // let namenode get started
-      Thread.sleep(2000);
-    } catch(InterruptedException e) {
-    }
     dataNode = new DataNodeRunner();
     dataNodeThread = new Thread(dataNode);
-    dataNodeThread.start();
-    try {                                     // let daemons get started
-      Thread.sleep(2000);
-    } catch(InterruptedException e) {
+    if (dataNodeFirst) {
+      dataNodeThread.start();      
+      nameNodeThread.start();      
+    } else {
+      nameNodeThread.start();
+      dataNodeThread.start();      
+    }
+    while (!nameNode.isUp()) {
+      try {                                     // let daemons get started
+        System.out.println("waiting for dfs minicluster to start");
+        Thread.sleep(2000);
+      } catch(InterruptedException e) {
+      }
     }
   }
   

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java Tue Jul 18 02:06:36 2006
@@ -38,7 +38,7 @@
    */
   public void testWorkingDirectory() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf);
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
     FileSystem fileSys = cluster.getFileSystem();
     try {
       Path orig_path = fileSys.getWorkingDirectory();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Tue Jul 18 02:06:36 2006
@@ -173,7 +173,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65314, conf);
+      cluster = new MiniDFSCluster(65314, conf, false);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
@@ -195,7 +195,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65316, conf);
+      cluster = new MiniDFSCluster(65316, conf, false);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
@@ -217,7 +217,7 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(65318, conf);
+      cluster = new MiniDFSCluster(65318, conf, false);
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Jul 18 02:06:36 2006
@@ -130,7 +130,8 @@
     public MiniMRCluster(int jobTrackerPort,
             int taskTrackerPort,
             int numTaskTrackers,
-            String namenode) throws IOException {
+            String namenode,
+            boolean taskTrackerFirst) throws IOException {
         this.jobTrackerPort = jobTrackerPort;
         this.taskTrackerPort = taskTrackerPort;
         this.numTaskTrackers = numTaskTrackers;
@@ -151,10 +152,8 @@
         pw.close();
         jobTracker = new JobTrackerRunner();
         jobTrackerThread = new Thread(jobTracker);
-        jobTrackerThread.start();
-        try {                                     // let jobTracker get started
-            Thread.sleep(2000);
-        } catch(InterruptedException e) {
+        if (!taskTrackerFirst) {
+          jobTrackerThread.start();
         }
         for (int idx = 0; idx < numTaskTrackers; idx++) {
             TaskTrackerRunner taskTracker = new TaskTrackerRunner();
@@ -163,6 +162,9 @@
             taskTrackerList.add(taskTracker);
             taskTrackerThreadList.add(taskTrackerThread);
         }
+        if (taskTrackerFirst) {
+          jobTrackerThread.start();
+        }
         try {                                     // let taskTrackers get started
             Thread.sleep(2000);
         } catch(InterruptedException e) {
@@ -201,7 +203,7 @@
     
     public static void main(String[] args) throws IOException {
         System.out.println("Bringing up Jobtracker and tasktrackers.");
-        MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local");
+        MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local", false);
         System.out.println("JobTracker and TaskTrackers are up.");
         mr.shutdown();
         System.out.println("JobTracker and TaskTrackers brought down.");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Tue Jul 18 02:06:36 2006
@@ -29,7 +29,7 @@
     public void testBringUp() throws IOException {
       MiniMRCluster mr = null;
       try {
-          mr = new MiniMRCluster(50000, 50010, 1, "local");
+          mr = new MiniMRCluster(50000, 50010, 1, "local", false);
       } finally {
           if (mr != null) { mr.shutdown(); }
       }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Jul 18 02:06:36 2006
@@ -32,7 +32,7 @@
   public void testWithLocal() throws IOException {
       MiniMRCluster mr = null;
       try {
-          mr = new MiniMRCluster(60030, 60040, 2, "local");
+          mr = new MiniMRCluster(60030, 60040, 2, "local", false);
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Jul 18 02:06:36 2006
@@ -39,10 +39,10 @@
       FileSystem fileSys = null;
       try {
           Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(65314, conf);
+          dfs = new MiniDFSCluster(65314, conf, true);
           fileSys = dfs.getFileSystem();
           namenode = fileSys.getName();
-          mr = new MiniMRCluster(50050, 50060, 4, namenode);
+          mr = new MiniMRCluster(50050, 50060, 4, namenode, true);
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:50050", namenode);
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));