You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/14 11:08:48 UTC
svn commit: r421841 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/ipc/
Author: cutting
Date: Fri Jul 14 02:08:47 2006
New Revision: 421841
URL: http://svn.apache.org/viewvc?rev=421841&view=rev
Log:
HADOOP-252. Add versioning to RPC protocols. Contributed by Milind.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jul 14 02:08:47 2006
@@ -37,6 +37,9 @@
10. HADOOP-354. Make public methods to stop DFS daemons.
(Barry Kaplan via cutting)
+11. HADOOP-252. Add versioning to RPC protocols.
+ (Milind Bhandarkar via cutting)
+
Release 0.4.0 - 2006-06-28
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Fri Jul 14 02:08:47 2006
@@ -16,6 +16,7 @@
package org.apache.hadoop.dfs;
import java.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
/**********************************************************************
* ClientProtocol is used by a piece of DFS user code to communicate
@@ -24,8 +25,10 @@
*
* @author Mike Cafarella
**********************************************************************/
-interface ClientProtocol {
+interface ClientProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
///////////////////////////////////////
// File contents
///////////////////////////////////////
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Jul 14 02:08:47 2006
@@ -97,9 +97,11 @@
/**
* Create a new DFSClient connected to the given namenode server.
*/
- public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) {
+ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
+ throws IOException {
this.conf = conf;
- this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf);
+ this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, nameNodeAddr, conf);
try {
this.localName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException uhe) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Jul 14 02:08:47 2006
@@ -118,7 +118,8 @@
// get storage info and lock the data dir
storage = new DataStorage( datadir );
// connect to name node
- this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class,
+ this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class,
+ DatanodeProtocol.versionID,
nameNodeAddr,
conf);
// find free port
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Fri Jul 14 02:08:47 2006
@@ -17,6 +17,7 @@
package org.apache.hadoop.dfs;
import java.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
/**********************************************************************
* Protocol that a DFS datanode uses to communicate with the NameNode.
@@ -27,7 +28,8 @@
*
* @author Michael Cafarella
**********************************************************************/
-interface DatanodeProtocol {
+interface DatanodeProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
// error code
final static int DISK_ERROR = 1;
final static int INVALID_BLOCK = 2;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri Jul 14 02:08:47 2006
@@ -20,7 +20,6 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.*;
import java.io.*;
@@ -57,6 +56,13 @@
* @author Mike Cafarella
**********************************************************/
public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ if (protocol.equals(ClientProtocol.class.getName())) {
+ return ClientProtocol.versionID;
+ } else {
+ return DatanodeProtocol.versionID;
+ }
+ }
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode");
public static final Log stateChangeLog = LogFactory.getLog( "org.apache.hadoop.dfs.StateChange");
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Fri Jul 14 02:08:47 2006
@@ -165,10 +165,21 @@
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public static Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) {
- return Proxy.newProxyInstance(protocol.getClassLoader(),
+ public static VersionedProtocol getProxy(Class protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf)
+ throws RemoteException {
+ VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
+ protocol.getClassLoader(),
new Class[] { protocol },
new Invoker(addr, conf));
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
+ if (serverVersion == clientVersion) {
+ return proxy;
+ } else {
+ throw new RemoteException(protocol.getName(),
+ "RPC Server and Client Versions Mismatched. SID:"+serverVersion+
+ " CID:"+clientVersion);
+ }
}
/** Expert: Make multiple, parallel calls to a set of servers. */
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=421841&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java Fri Jul 14 02:08:47 2006
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.io.UTF8;
+
+/**
+ * Superclass of all protocols that use Hadoop RPC.
+ * Subclasses of this interface are also supposed to have
+ * a static final long versionID field.
+ * @author milindb
+ */
+public interface VersionedProtocol {
+ /**
+ * Return protocol version corresponding to protocol interface
+ */
+ public long getProtocolVersion(String protocol, long clientVersion);
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Jul 14 02:08:47 2006
@@ -18,11 +18,15 @@
import java.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
/**
* Protocol that a TaskTracker and the central JobTracker use to communicate.
* The JobTracker is the Server, which implements this protocol.
*/
-interface InterTrackerProtocol {
+interface InterTrackerProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Fri Jul 14 02:08:47 2006
@@ -32,6 +32,10 @@
private static class FakeUmbilical implements TaskUmbilicalProtocol {
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TaskUmbilicalProtocol.versionID;
+ }
+
public void done(String taskid) throws IOException {
LOG.info("Task " + taskid + " reporting done.");
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Jul 14 02:08:47 2006
@@ -187,6 +187,7 @@
} else {
this.jobSubmitClient = (JobSubmissionProtocol)
RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID,
JobTracker.getAddress(conf), conf);
}
}
@@ -196,7 +197,8 @@
*/
public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
this.jobSubmitClient = (JobSubmissionProtocol)
- RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr, conf);
+ RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, jobTrackAddr, conf);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Jul 14 02:08:47 2006
@@ -17,14 +17,16 @@
package org.apache.hadoop.mapred;
import java.io.*;
-import java.util.*;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
/**
* Protocol that a JobClient and the central JobTracker use to communicate. The
* JobClient can use these methods to submit a Job for execution, and learn about
* the current system status.
*/
-interface JobSubmissionProtocol {
+interface JobSubmissionProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
/**
* Submit a Job for execution. Returns the latest profile for
* that job.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jul 14 02:08:47 2006
@@ -87,6 +87,13 @@
tracker = null;
}
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ if (protocol.equals(InterTrackerProtocol.class.getName())) {
+ return InterTrackerProtocol.versionID;
+ } else {
+ return JobSubmissionProtocol.versionID;
+ }
+ }
/**
* A thread to timeout tasks that have been assigned to task trackers,
* but that haven't reported back yet.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Jul 14 02:08:47 2006
@@ -35,6 +35,10 @@
private int map_tasks = 0;
private int reduce_tasks = 0;
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return JobSubmissionProtocol.versionID;
+ }
+
private class Job extends Thread
implements TaskUmbilicalProtocol {
private String file;
@@ -48,6 +52,10 @@
private Path localFile;
private FileSystem localFs;
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TaskUmbilicalProtocol.versionID;
+ }
+
public Job(String file, Configuration conf) throws IOException {
this.file = file;
this.id = "job_" + newId();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jul 14 02:08:47 2006
@@ -114,6 +114,9 @@
taskCleanupThread.start();
}
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TaskUmbilicalProtocol.versionID;
+ }
/**
* Do the real constructor work here. It's in a separate method
* so we can call it again and "recycle" the object after calling
@@ -160,7 +163,8 @@
this.mapOutputFile.cleanupStorage();
this.justStarted = true;
- this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
+ this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class,
+ InterTrackerProtocol.versionID, jobTrackAddr, this.fConf);
this.running = true;
}
@@ -1002,6 +1006,7 @@
String taskid = args[1];
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+ TaskUmbilicalProtocol.versionID,
new InetSocketAddress(port),
defaultConf);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Jul 14 02:08:47 2006
@@ -18,12 +18,16 @@
import java.io.IOException;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
/** Protocol that task child process uses to contact its parent process. The
* parent is a daemon which which polls the central master for a new map or
* reduce task and runs it as a child process. All communication between child
* and parent is via this protocol. */
-interface TaskUmbilicalProtocol {
+interface TaskUmbilicalProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
/** Called when a child task process starts, to get its task.*/
Task getTask(String taskid) throws IOException;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=421841&r1=421840&r2=421841&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Jul 14 02:08:47 2006
@@ -30,6 +30,8 @@
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
/** Unit tests for RPC. */
public class TestRPC extends TestCase {
private static final int PORT = 1234;
@@ -46,7 +48,9 @@
public TestRPC(String name) { super(name); }
- public interface TestProtocol {
+ public interface TestProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
void ping() throws IOException;
String echo(String value) throws IOException;
String[] echo(String[] value) throws IOException;
@@ -59,6 +63,10 @@
public class TestImpl implements TestProtocol {
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TestProtocol.versionID;
+ }
+
public void ping() {}
public String echo(String value) throws IOException { return value; }
@@ -98,7 +106,7 @@
InetSocketAddress addr = new InetSocketAddress(PORT);
TestProtocol proxy =
- (TestProtocol)RPC.getProxy(TestProtocol.class, addr, conf);
+ (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
proxy.ping();