You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/14 11:08:48 UTC

svn commit: r421841 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/ipc/

Author: cutting
Date: Fri Jul 14 02:08:47 2006
New Revision: 421841

URL: http://svn.apache.org/viewvc?rev=421841&view=rev
Log:
HADOOP-252.  Add versioning to RPC protocols.  Contributed by Milind.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jul 14 02:08:47 2006
@@ -37,6 +37,9 @@
 10. HADOOP-354.  Make public methods to stop DFS daemons.
    (Barry Kaplan via cutting)
 
+11. HADOOP-252.  Add versioning to RPC protocols.
+    (Milind Bhandarkar via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Fri Jul 14 02:08:47 2006
@@ -16,6 +16,7 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 /**********************************************************************
  * ClientProtocol is used by a piece of DFS user code to communicate 
@@ -24,8 +25,10 @@
  *
  * @author Mike Cafarella
  **********************************************************************/
-interface ClientProtocol {
+interface ClientProtocol extends VersionedProtocol {
 
+  public static final long versionID = 1L;
+  
     ///////////////////////////////////////
     // File contents
     ///////////////////////////////////////

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Jul 14 02:08:47 2006
@@ -97,9 +97,11 @@
     /** 
      * Create a new DFSClient connected to the given namenode server.
      */
-    public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) {
+    public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
+    throws IOException {
         this.conf = conf;
-        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf);
+        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+            ClientProtocol.versionID, nameNodeAddr, conf);
         try {
             this.localName = InetAddress.getLocalHost().getHostName();
         } catch (UnknownHostException uhe) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Jul 14 02:08:47 2006
@@ -118,7 +118,8 @@
       // get storage info and lock the data dir
       storage = new DataStorage( datadir );
       // connect to name node
-      this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, 
+      this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class,
+                                                      DatanodeProtocol.versionID,
                                                       nameNodeAddr, 
                                                       conf);
       // find free port

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Fri Jul 14 02:08:47 2006
@@ -17,6 +17,7 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 /**********************************************************************
  * Protocol that a DFS datanode uses to communicate with the NameNode.
@@ -27,7 +28,8 @@
  *
  * @author Michael Cafarella
  **********************************************************************/
-interface DatanodeProtocol {
+interface DatanodeProtocol extends VersionedProtocol {
+  public static final long versionID = 1L;
   // error code
   final static int DISK_ERROR = 1;
   final static int INVALID_BLOCK = 2;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri Jul 14 02:08:47 2006
@@ -20,7 +20,6 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.*;
 
 import java.io.*;
 
@@ -57,6 +56,13 @@
  * @author Mike Cafarella
  **********************************************************/
 public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
+    public long getProtocolVersion(String protocol, long clientVersion) { 
+      if (protocol.equals(ClientProtocol.class.getName())) {
+        return ClientProtocol.versionID; 
+      } else {
+        return DatanodeProtocol.versionID;
+      }
+    }
     public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode");
     public static final Log stateChangeLog = LogFactory.getLog( "org.apache.hadoop.dfs.StateChange");
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Fri Jul 14 02:08:47 2006
@@ -165,10 +165,21 @@
 
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) {
-    return Proxy.newProxyInstance(protocol.getClassLoader(),
+  public static VersionedProtocol getProxy(Class protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf)
+  throws RemoteException {
+    VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
+                                  protocol.getClassLoader(),
                                   new Class[] { protocol },
                                   new Invoker(addr, conf));
+    long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
+    if (serverVersion == clientVersion) {
+      return proxy;
+    } else {
+      throw new RemoteException(protocol.getName(),
+          "RPC Server and Client Versions Mismatched. SID:"+serverVersion+
+          " CID:"+clientVersion);
+    }
   }
 
   /** Expert: Make multiple, parallel calls to a set of servers. */

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=421841&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java Fri Jul 14 02:08:47 2006
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.io.UTF8;
+
+/**
+ * Superclass of all protocols that use Hadoop RPC.
+ * Subclasses of this interface are also supposed to have
+ * a static final long versionID field.
+ * @author milindb
+ */
+public interface VersionedProtocol {
+  /**
+   * Return protocol version corresponding to protocol interface
+   */
+  public long getProtocolVersion(String protocol, long clientVersion);
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Jul 14 02:08:47 2006
@@ -18,11 +18,15 @@
 
 import java.io.*;
 
+import org.apache.hadoop.ipc.VersionedProtocol;
+
 /** 
  * Protocol that a TaskTracker and the central JobTracker use to communicate.
  * The JobTracker is the Server, which implements this protocol.
  */ 
-interface InterTrackerProtocol {
+interface InterTrackerProtocol extends VersionedProtocol {
+  public static final long versionID = 1L;
+  
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Fri Jul 14 02:08:47 2006
@@ -32,6 +32,10 @@
 
   private static class FakeUmbilical implements TaskUmbilicalProtocol {
 
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
+    
     public void done(String taskid) throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Jul 14 02:08:47 2006
@@ -187,6 +187,7 @@
         } else {
           this.jobSubmitClient = (JobSubmissionProtocol) 
             RPC.getProxy(JobSubmissionProtocol.class,
+                         JobSubmissionProtocol.versionID,
                          JobTracker.getAddress(conf), conf);
         }        
     }
@@ -196,7 +197,8 @@
      */
     public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
         this.jobSubmitClient = (JobSubmissionProtocol) 
-            RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr, conf);
+            RPC.getProxy(JobSubmissionProtocol.class,
+                         JobSubmissionProtocol.versionID, jobTrackAddr, conf);
     }
 
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Jul 14 02:08:47 2006
@@ -17,14 +17,16 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
-import java.util.*;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 /** 
  * Protocol that a JobClient and the central JobTracker use to communicate.  The
  * JobClient can use these methods to submit a Job for execution, and learn about
  * the current system status.
  */ 
-interface JobSubmissionProtocol {
+interface JobSubmissionProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
     /**
      * Submit a Job for execution.  Returns the latest profile for
      * that job.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jul 14 02:08:47 2006
@@ -87,6 +87,13 @@
       tracker = null;
     }
     
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      if (protocol.equals(InterTrackerProtocol.class.getName())) {
+        return InterTrackerProtocol.versionID;
+      } else {
+        return JobSubmissionProtocol.versionID;
+      }
+    }
     /**
      * A thread to timeout tasks that have been assigned to task trackers,
      * but that haven't reported back yet.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Jul 14 02:08:47 2006
@@ -35,6 +35,10 @@
   private int map_tasks = 0;
   private int reduce_tasks = 0;
 
+  public long getProtocolVersion(String protocol, long clientVersion) {
+    return JobSubmissionProtocol.versionID;
+  }
+  
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
     private String file;
@@ -48,6 +52,10 @@
     private Path localFile;
     private FileSystem localFs;
 
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
+    
     public Job(String file, Configuration conf) throws IOException {
       this.file = file;
       this.id = "job_" + newId();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jul 14 02:08:47 2006
@@ -114,6 +114,9 @@
       taskCleanupThread.start();
     }
     
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
     /**
      * Do the real constructor work here.  It's in a separate method
      * so we can call it again and "recycle" the object after calling
@@ -160,7 +163,8 @@
         this.mapOutputFile.cleanupStorage();
         this.justStarted = true;
 
-        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
+        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class,
+            InterTrackerProtocol.versionID, jobTrackAddr, this.fConf);
         
         this.running = true;
     }
@@ -1002,6 +1006,7 @@
           String taskid = args[1];
           TaskUmbilicalProtocol umbilical =
             (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+                                                TaskUmbilicalProtocol.versionID,
                                                 new InetSocketAddress(port), 
                                                 defaultConf);
             

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Jul 14 02:08:47 2006
@@ -18,12 +18,16 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.VersionedProtocol;
+
 /** Protocol that task child process uses to contact its parent process.  The
  * parent is a daemon which which polls the central master for a new map or
  * reduce task and runs it as a child process.  All communication between child
  * and parent is via this protocol. */ 
-interface TaskUmbilicalProtocol {
+interface TaskUmbilicalProtocol extends VersionedProtocol {
 
+  public static final long versionID = 1L;
+  
   /** Called when a child task process starts, to get its task.*/
   Task getTask(String taskid) throws IOException;
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Jul 14 02:08:47 2006
@@ -30,6 +30,8 @@
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 
+import org.apache.hadoop.ipc.VersionedProtocol;
+
 /** Unit tests for RPC. */
 public class TestRPC extends TestCase {
   private static final int PORT = 1234;
@@ -46,7 +48,9 @@
 
   public TestRPC(String name) { super(name); }
 	
-  public interface TestProtocol {
+  public interface TestProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
+    
     void ping() throws IOException;
     String echo(String value) throws IOException;
     String[] echo(String[] value) throws IOException;
@@ -59,6 +63,10 @@
 
   public class TestImpl implements TestProtocol {
 
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TestProtocol.versionID;
+    }
+    
     public void ping() {}
 
     public String echo(String value) throws IOException { return value; }
@@ -98,7 +106,7 @@
 
     InetSocketAddress addr = new InetSocketAddress(PORT);
     TestProtocol proxy =
-      (TestProtocol)RPC.getProxy(TestProtocol.class, addr, conf);
+      (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
     
     proxy.ping();