You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/02/27 04:58:20 UTC

svn commit: r1294017 [2/2] - in /hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project: ./ hadoop-auth/ hadoop-common/ hadoop-common/dev-support/ hadoop-common/src/main/docs/ hadoop-common/src/main/java/ hadoop-common/src/main/java/org/apa...

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Feb 27 03:58:10 2012
@@ -42,7 +42,9 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,28 +68,33 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
 import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -108,12 +115,66 @@ public abstract class Server {
   // 4 : Introduced SASL security layer
   // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
   //     in ObjectWritable to efficiently transmit arrays of primitives
-  public static final byte CURRENT_VERSION = 5;
+  // 6 : Made RPC payload header explicit
+  public static final byte CURRENT_VERSION = 6;
 
   /**
    * Initial and max size of response buffer
    */
   static int INITIAL_RESP_BUF_SIZE = 10240;
+  
+  static class RpcKindMapValue {
+    final Class<? extends Writable> rpcRequestWrapperClass;
+    final RpcInvoker rpcInvoker;
+    RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+      this.rpcInvoker = rpcInvoker;
+      this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+    }   
+  }
+  static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RpcKind, RpcKindMapValue>(4);
+  
+  
+
+  /**
+   * Register a RPC kind and the class to deserialize the rpc request.
+   * 
+   * Called by static initializers of rpcKind Engines
+   * @param rpcKind
+   * @param rpcRequestWrapperClass - this class is used to deserialze the
+   *  the rpc request.
+   *  @param rpcInvoker - use to process the calls on SS.
+   */
+  
+  public static void registerProtocolEngine(RpcKind rpcKind, 
+          Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+    RpcKindMapValue  old = 
+        rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+    if (old != null) {
+      rpcKindMap.put(rpcKind, old);
+      throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+          rpcKind);      
+    }
+    LOG.debug("rpcKind=" + rpcKind + 
+        ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + 
+        ", rpcInvoker=" + rpcInvoker);
+  }
+  
+  public Class<? extends Writable> getRpcRequestWrapper(
+      RpcKind rpcKind) {
+    if (rpcRequestClass != null)
+       return rpcRequestClass;
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcRequestWrapperClass; 
+  }
+  
+  public static RpcInvoker  getRpcInvoker(RpcKind rpcKind) {
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcInvoker; 
+  }
+  
 
   public static final Log LOG = LogFactory.getLog(Server.class);
   public static final Log AUDITLOG = 
@@ -178,7 +239,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
-  private Class<? extends Writable> paramClass;   // class of call parameters
+  private Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc request
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -239,10 +300,21 @@ public abstract class Server {
    * Returns a handle to the rpcMetrics (required in tests)
    * @return rpc metrics
    */
+  @VisibleForTesting
   public RpcMetrics getRpcMetrics() {
     return rpcMetrics;
   }
 
+  @VisibleForTesting
+  public RpcDetailedMetrics getRpcDetailedMetrics() {
+    return rpcDetailedMetrics;
+  }
+  
+  @VisibleForTesting
+  Iterable<? extends Thread> getHandlers() {
+    return Arrays.asList(handlers);
+  }
+
   /**
    * Refresh the service authorization ACL for the service handled by this server.
    */
@@ -261,28 +333,33 @@ public abstract class Server {
 
   /** A call queued for handling. */
   private static class Call {
-    private int id;                               // the client's call id
-    private Writable param;                       // the parameter passed
-    private Connection connection;                // connection to client
-    private long timestamp;     // the time received when response is null
-                                   // the time served when response is not null
-    private ByteBuffer response;                      // the response for this call
-
-    public Call(int id, Writable param, Connection connection) { 
-      this.id = id;
-      this.param = param;
+    private final int callId;             // the client's call id
+    private final Writable rpcRequest;    // Serialized Rpc request from client
+    private final Connection connection;  // connection to client
+    private long timestamp;               // time received when response is null
+                                          // time served when response is not null
+    private ByteBuffer rpcResponse;       // the response for this call
+    private final RpcKind rpcKind;
+
+    public Call(int id, Writable param, Connection connection) {
+      this( id,  param,  connection, RpcKind.RPC_BUILTIN );    
+    }
+    public Call(int id, Writable param, Connection connection, RpcKind kind) { 
+      this.callId = id;
+      this.rpcRequest = param;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
-      this.response = null;
+      this.rpcResponse = null;
+      this.rpcKind = kind;
     }
     
     @Override
     public String toString() {
-      return param.toString() + " from " + connection.toString();
+      return rpcRequest.toString() + " from " + connection.toString();
     }
 
     public void setResponse(ByteBuffer response) {
-      this.response = response;
+      this.rpcResponse = response;
     }
   }
 
@@ -781,17 +858,17 @@ public abstract class Server {
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+            LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                       call.connection);
           }
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channelWrite(channel, call.response);
+          int numBytes = channelWrite(channel, call.rpcResponse);
           if (numBytes < 0) {
             return true;
           }
-          if (!call.response.hasRemaining()) {
+          if (!call.rpcResponse.hasRemaining()) {
             call.connection.decRpcCount();
             if (numElements == 1) {    // last call fully processes.
               done = true;             // no more data for this channel.
@@ -799,7 +876,7 @@ public abstract class Server {
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote " + numBytes + " bytes.");
             }
           } else {
@@ -827,7 +904,7 @@ public abstract class Server {
               }
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote partial " + numBytes + 
                         " bytes.");
             }
@@ -893,7 +970,7 @@ public abstract class Server {
     private InetAddress addr;
     
     ConnectionHeader header = new ConnectionHeader();
-    Class<?> protocol;
+    String protocolName;
     boolean useSasl;
     SaslServer saslServer;
     private AuthMethod authMethod;
@@ -1284,15 +1361,8 @@ public abstract class Server {
       DataInputStream in =
         new DataInputStream(new ByteArrayInputStream(buf));
       header.readFields(in);
-      try {
-        String protocolClassName = header.getProtocol();
-        if (protocolClassName != null) {
-          protocol = getProtocolClass(header.getProtocol(), conf);
-          rpcDetailedMetrics.init(protocol);
-        }
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException("Unknown protocol: " + header.getProtocol());
-      }
+      protocolName = header.getProtocol();
+
       
       UserGroupInformation protocolUser = header.getUgi();
       if (!useSasl) {
@@ -1384,18 +1454,43 @@ public abstract class Server {
     private void processData(byte[] buf) throws  IOException, InterruptedException {
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
-      int id = dis.readInt();                    // try to read an id
+      RpcPayloadHeader header = new RpcPayloadHeader();
+      header.readFields(dis);           // Read the RpcPayload header
         
       if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + id);
-      Writable param;
-      try {
-        param = ReflectionUtils.newInstance(paramClass, conf);//read param
-        param.readFields(dis);
+        LOG.debug(" got #" + header.getCallId());
+      if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
+        throw new IOException("IPC Server does not implement operation" + 
+              header.getOperation());
+      }
+      // If we know the rpc kind, get its class so that we can deserialize
+      // (Note it would make more sense to have the handler deserialize but 
+      // we continue with this original design.
+      Class<? extends Writable> rpcRequestClass = 
+          getRpcRequestWrapper(header.getkind());
+      if (rpcRequestClass == null) {
+        LOG.warn("Unknown rpc kind "  + header.getkind() + 
+            " from client " + getHostAddress());
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+            IOException.class.getName(),
+            "Unknown rpc kind "  + header.getkind());
+        responder.doRespond(readParamsFailedCall);
+        return;   
+      }
+      Writable rpcRequest;
+      try { //Read the rpc request
+        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
+        rpcRequest.readFields(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
-                 getHostAddress(), t);
-        final Call readParamsFailedCall = new Call(id, null, this);
+                 getHostAddress() + "on connection protocol " +
+            this.protocolName + " for rpcKind " + header.getkind(),  t);
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
         setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@@ -1405,7 +1500,7 @@ public abstract class Server {
         return;
       }
         
-      Call call = new Call(id, param, this);
+      Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
       callQueue.put(call);              // queue the call; maybe blocked here
       incRpcCount();  // Increment the rpc count
     }
@@ -1469,8 +1564,8 @@ public abstract class Server {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": has #" + call.id + " from " +
-                      call.connection);
+            LOG.debug(getName() + ": has Call#" + call.callId + 
+                "for RpcKind " + call.rpcKind + " from " + call.connection);
           
           String errorClass = null;
           String error = null;
@@ -1481,7 +1576,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocol, call.param, 
+              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
             } else {
               value = 
@@ -1490,8 +1585,8 @@ public abstract class Server {
                      @Override
                      public Writable run() throws Exception {
                        // make the call
-                       return call(call.connection.protocol, 
-                                   call.param, call.timestamp);
+                       return call(call.rpcKind, call.connection.protocolName, 
+                                   call.rpcRequest, call.timestamp);
 
                      }
                    }
@@ -1543,24 +1638,33 @@ public abstract class Server {
                   Configuration conf)
     throws IOException 
   {
-    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+        .toString(port), null);
   }
   
-  /** Constructs a server listening on the named port and address.  Parameters passed must
+  /** 
+   * Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
    * from configuration. Otherwise the configuration will be picked up.
+   * 
+   * If rpcRequestClass is null then the rpcRequestClass must have been 
+   * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+   *  Class, RPC.RpcInvoker)}
+   * This parameter has been retained for compatibility with existing tests
+   * and usage.
    */
   @SuppressWarnings("unchecked")
-  protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
-                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
-    this.paramClass = paramClass;
+    this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     if (queueSizePerHandler != -1) {
@@ -1641,7 +1745,7 @@ public abstract class Server {
   throws IOException {
     response.reset();
     DataOutputStream out = new DataOutputStream(response);
-    out.writeInt(call.id);                // write call id
+    out.writeInt(call.callId);                // write call id
     out.writeInt(status.state);           // write status
 
     if (status == Status.SUCCESS) {
@@ -1758,17 +1862,17 @@ public abstract class Server {
   
   /** 
    * Called for each call. 
-   * @deprecated Use {@link #call(Class, Writable, long)} instead
+   * @deprecated Use  {@link #call(RpcPayloadHeader.RpcKind, String,
+   *  Writable, long)} instead
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
-    return call(null, param, receiveTime);
+    return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   
   /** Called for each call. */
-  public abstract Writable call(Class<?> protocol,
-                               Writable param, long receiveTime)
-  throws IOException;
+  public abstract Writable call(RpcKind rpcKind, String protocol,
+      Writable param, long receiveTime) throws IOException;
   
   /**
    * Authorize the incoming client connection.
@@ -1918,5 +2022,5 @@ public abstract class Server {
 
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
-  }      
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java Mon Feb 27 03:58:10 2012
@@ -34,7 +34,6 @@ public interface VersionedProtocol {
    * @return the version that the server will speak
    * @throws IOException if any IO error occurs
    */
-  @Deprecated
   public long getProtocolVersion(String protocol,
                                  long clientVersion) throws IOException;
 

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon Feb 27 03:58:10 2012
@@ -18,23 +18,23 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 import java.lang.reflect.Array;
-import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 
 import java.net.InetSocketAddress;
 import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -49,8 +49,38 @@ public class WritableRpcEngine implement
   
   //writableRpcVersion should be updated if there is a change
   //in format of the rpc messages.
-  public static long writableRpcVersion = 1L;
+  
+  // 2L - added declared class to Invocation
+  public static final long writableRpcVersion = 2L;
+  
+  /**
+   * Whether or not this class has been initialized.
+   */
+  private static boolean isInitialized = false;
+  
+  static { 
+    ensureInitialized();
+  }
+  
+  /**
+   * Initialize this class if it isn't already.
+   */
+  public static synchronized void ensureInitialized() {
+    if (!isInitialized) {
+      initialize();
+    }
+  }
+  
+  /**
+   * Register the rpcRequest deserializer for WritableRpcEngine
+   */
+  private static synchronized void initialize() {
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+        Invocation.class, new Server.WritableRpcInvoker());
+    isInitialized = true;
+  }
 
+  
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
     private String methodName;
@@ -59,11 +89,13 @@ public class WritableRpcEngine implement
     private Configuration conf;
     private long clientVersion;
     private int clientMethodsHash;
+    private String declaringClassProtocolName;
     
     //This could be different from static writableRpcVersion when received
     //at server, if client is using a different version.
     private long rpcVersion;
 
+    @SuppressWarnings("unused") // called when deserializing an invocation
     public Invocation() {}
 
     public Invocation(Method method, Object[] parameters) {
@@ -76,18 +108,12 @@ public class WritableRpcEngine implement
         clientVersion = 0;
         clientMethodsHash = 0;
       } else {
-        try {
-          Field versionField = method.getDeclaringClass().getField("versionID");
-          versionField.setAccessible(true);
-          this.clientVersion = versionField.getLong(method.getDeclaringClass());
-        } catch (NoSuchFieldException ex) {
-          throw new RuntimeException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new RuntimeException(ex);
-        }
+        this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
       }
+      this.declaringClassProtocolName = 
+          RPC.getProtocolName(method.getDeclaringClass());
     }
 
     /** The name of the method invoked. */
@@ -103,6 +129,7 @@ public class WritableRpcEngine implement
       return clientVersion;
     }
 
+    @SuppressWarnings("unused")
     private int getClientMethodsHash() {
       return clientMethodsHash;
     }
@@ -115,8 +142,10 @@ public class WritableRpcEngine implement
       return rpcVersion;
     }
 
+    @SuppressWarnings("deprecation")
     public void readFields(DataInput in) throws IOException {
       rpcVersion = in.readLong();
+      declaringClassProtocolName = UTF8.readString(in);
       methodName = UTF8.readString(in);
       clientVersion = in.readLong();
       clientMethodsHash = in.readInt();
@@ -124,13 +153,16 @@ public class WritableRpcEngine implement
       parameterClasses = new Class[parameters.length];
       ObjectWritable objectWritable = new ObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+        parameters[i] = 
+            ObjectWritable.readObject(in, objectWritable, this.conf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
 
+    @SuppressWarnings("deprecation")
     public void write(DataOutput out) throws IOException {
       out.writeLong(rpcVersion);
+      UTF8.writeString(out, declaringClassProtocolName);
       UTF8.writeString(out, methodName);
       out.writeLong(clientVersion);
       out.writeInt(clientMethodsHash);
@@ -169,7 +201,7 @@ public class WritableRpcEngine implement
 
   private static ClientCache CLIENTS=new ClientCache();
   
-  private static class Invoker implements InvocationHandler {
+  private static class Invoker implements RpcInvocationHandler {
     private Client.ConnectionId remoteId;
     private Client client;
     private boolean isClosed = false;
@@ -191,7 +223,7 @@ public class WritableRpcEngine implement
       }
 
       ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), remoteId);
+        client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
       if (LOG.isDebugEnabled()) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -200,12 +232,17 @@ public class WritableRpcEngine implement
     }
     
     /* close the IPC client that's responsible for this invoker's RPCs */ 
-    synchronized private void close() {
+    synchronized public void close() {
       if (!isClosed) {
         isClosed = true;
         CLIENTS.stopClient(client);
       }
     }
+
+    @Override
+    public ConnectionId getConnectionId() {
+      return remoteId;
+    }
   }
   
   // for unit testing only
@@ -231,15 +268,6 @@ public class WritableRpcEngine implement
             factory, rpcTimeout));
     return new ProtocolProxy<T>(protocol, proxy, true);
   }
-
-  /**
-   * Stop this proxy and release its invoker's resource
-   * @param proxy the proxy to be stopped
-   */
-  public void stopProxy(Object proxy) {
-    ((Invoker)Proxy.getInvocationHandler(proxy)).close();
-  }
-
   
   /** Expert: Make multiple, parallel calls to a set of servers. */
   public Object[] call(Method method, Object[][] params,
@@ -273,134 +301,238 @@ public class WritableRpcEngine implement
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public Server getServer(Class<?> protocol,
-                          Object instance, String bindAddress, int port,
-                          int numHandlers, int numReaders, int queueSizePerHandler,
-                          boolean verbose, Configuration conf,
+  public RPC.Server getServer(Class<?> protocolClass,
+                      Object protocolImpl, String bindAddress, int port,
+                      int numHandlers, int numReaders, int queueSizePerHandler,
+                      boolean verbose, Configuration conf,
                       SecretManager<? extends TokenIdentifier> secretManager) 
     throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, 
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
 
+
   /** An RPC Server. */
   public static class Server extends RPC.Server {
-    private Object instance;
-    private boolean verbose;
-
-    /** Construct an RPC server.
+    /**
+     * Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
+     * 
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
+     */
+    @Deprecated
+    public Server(Object instance, Configuration conf, String bindAddress,
+        int port) throws IOException {
+      this(null, instance, conf,  bindAddress, port);
+    }
+    
+    
+    /** Construct an RPC server.
+     * @param protocolClass class
+     * @param protocolImpl the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
      */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+    public Server(Class<?> protocolClass, Object protocolImpl, 
+        Configuration conf, String bindAddress, int port) 
       throws IOException {
-      this(instance, conf,  bindAddress, port, 1, -1, -1, false, null);
+      this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
+          false, null);
     }
     
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
+    /** 
+     * Construct an RPC server.
+     * @param protocolImpl the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * 
+     * @deprecated use Server#Server(Class, Object, 
+     *      Configuration, String, int, int, int, int, boolean, SecretManager)
+     */
+    @Deprecated
+    public Server(Object protocolImpl, Configuration conf, String bindAddress,
+        int port, int numHandlers, int numReaders, int queueSizePerHandler,
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
+            throws IOException {
+       this(null, protocolImpl,  conf,  bindAddress,   port,
+                   numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
+                   secretManager);
+   
     }
     
-    /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
+    /** 
+     * Construct an RPC server.
+     * @param protocolClass - the protocol being registered
+     *     can be null for compatibility with old usage (see below for details)
+     * @param protocolImpl the protocol impl that will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, 
-                  SecretManager<? extends TokenIdentifier> secretManager) 
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress,  int port,
+        int numHandlers, int numReaders, int queueSizePerHandler, 
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+      super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
-          classNameBase(instance.getClass().getName()), secretManager);
-      this.instance = instance;
+          classNameBase(protocolImpl.getClass().getName()), secretManager);
+
       this.verbose = verbose;
-    }
+      
+      
+      Class<?>[] protocols;
+      if (protocolClass == null) { // derive protocol from impl
+        /*
+         * In order to remain compatible with the old usage where a single
+         * target protocolImpl is suppled for all protocol interfaces, and
+         * the protocolImpl is derived from the protocolClass(es) 
+         * we register all interfaces extended by the protocolImpl
+         */
+        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
 
-    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
-    throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-
-        Method method = protocol.getMethod(call.getMethodName(),
-                                           call.getParameterClasses());
-        method.setAccessible(true);
-
-        // Verify rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new IOException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
+      } else {
+        if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
+          throw new IOException("protocolClass "+ protocolClass +
+              " is not implemented by protocolImpl which is of class " +
+              protocolImpl.getClass());
         }
-        
-        //Verify protocol version.
-        //Bypass the version check for VersionedProtocol
-        if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
+        // register protocol class and its super interfaces
+        registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+        protocols = RPC.getProtocolInterfaces(protocolClass);
+      }
+      for (Class<?> p : protocols) {
+        if (!p.equals(VersionedProtocol.class)) {
+          registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
+        }
+      }
+
+    }
+
+    private static void log(String value) {
+      if (value!= null && value.length() > 55)
+        value = value.substring(0, 55)+"...";
+      LOG.info(value);
+    }
+    
+    static class WritableRpcInvoker implements RpcInvoker {
+
+     @Override
+      public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+          String protocolName, Writable rpcRequest, long receivedTime)
+          throws IOException {
+        try {
+          Invocation call = (Invocation)rpcRequest;
+          if (server.verbose) log("Call: " + call);
+
+          // Verify rpc version
+          if (call.getRpcVersion() != writableRpcVersion) {
+            // Client is using a different version of WritableRpc
+            throw new IOException(
+                "WritableRpc version mismatch, client side version="
+                    + call.getRpcVersion() + ", server side version="
+                    + writableRpcVersion);
+          }
+
           long clientVersion = call.getProtocolVersion();
-          ProtocolSignature serverInfo = ((VersionedProtocol) instance)
-              .getProtocolSignature(protocol.getCanonicalName(), call
-                  .getProtocolVersion(), call.getClientMethodsHash());
-          long serverVersion = serverInfo.getVersion();
-          if (serverVersion != clientVersion) {
-            LOG.warn("Version mismatch: client version=" + clientVersion
-                + ", server version=" + serverVersion);
-            throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
-                serverVersion);
+          final String protoName;
+          ProtoClassProtoImpl protocolImpl;
+          if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+            // VersionProtocol methods are often used by client to figure out
+            // which version of protocol to use.
+            //
+            // Versioned protocol methods should go the protocolName protocol
+            // rather than the declaring class of the method since the
+            // the declaring class is VersionedProtocol which is not 
+            // registered directly.
+            // Send the call to the highest  protocol version
+            VerProtocolImpl highest = server.getHighestSupportedProtocol(
+                RpcKind.RPC_WRITABLE, protocolName);
+            if (highest == null) {
+              throw new IOException("Unknown protocol: " + protocolName);
+            }
+            protocolImpl = highest.protocolTarget;
+          } else {
+            protoName = call.declaringClassProtocolName;
+
+            // Find the right impl for the protocol based on client version.
+            ProtoNameVer pv = 
+                new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+            protocolImpl = 
+                server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+            if (protocolImpl == null) { // no match for Protocol AND Version
+               VerProtocolImpl highest = 
+                   server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, 
+                       protoName);
+              if (highest == null) {
+                throw new IOException("Unknown protocol: " + protoName);
+              } else { // protocol supported but not the version that client wants
+                throw new RPC.VersionMismatch(protoName, clientVersion,
+                  highest.version);
+              }
+            }
           }
-        }
+          
 
-        long startTime = System.currentTimeMillis();
-        Object value = method.invoke(instance, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.addRpcQueueTime(qTime);
-        rpcMetrics.addRpcProcessingTime(processingTime);
-        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
-                                             processingTime);
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
+          // Invoke the protocol method
+
+          long startTime = System.currentTimeMillis();
+          Method method = 
+              protocolImpl.protocolClass.getMethod(call.getMethodName(),
+              call.getParameterClasses());
+          method.setAccessible(true);
+          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          Object value = 
+              method.invoke(protocolImpl.protocolImpl, call.getParameters());
+          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int qTime = (int) (startTime-receivedTime);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Served: " + call.getMethodName() +
+                      " queueTime= " + qTime +
+                      " procesingTime= " + processingTime);
+          }
+          server.rpcMetrics.addRpcQueueTime(qTime);
+          server.rpcMetrics.addRpcProcessingTime(processingTime);
+          server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+                                               processingTime);
+          if (server.verbose) log("Return: "+value);
+
+          return new ObjectWritable(method.getReturnType(), value);
+
+        } catch (InvocationTargetException e) {
+          Throwable target = e.getTargetException();
+          if (target instanceof IOException) {
+            throw (IOException)target;
+          } else {
+            IOException ioe = new IOException(target.toString());
+            ioe.setStackTrace(target.getStackTrace());
+            throw ioe;
+          }
+        } catch (Throwable e) {
+          if (!(e instanceof IOException)) {
+            LOG.error("Unexpected throwable object ", e);
+          }
+          IOException ioe = new IOException(e.toString());
+          ioe.setStackTrace(e.getStackTrace());
           throw ioe;
         }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
       }
     }
   }
 
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
+  @Override
+  public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+      ConnectionId connId, Configuration conf, SocketFactory factory)
+      throws IOException {
+    throw new UnsupportedOperationException("This proxy is not supported");
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java Mon Feb 27 03:58:10 2012
@@ -42,15 +42,20 @@ public class DelegationKey implements Wr
   @Nullable
   private byte[] keyBytes = null;
 
+  /** Default constructore required for Writable */
   public DelegationKey() {
-    this(0, 0L, null);
+    this(0, 0L, (SecretKey)null);
   }
 
   public DelegationKey(int keyId, long expiryDate, SecretKey key) {
+    this(keyId, expiryDate, key != null ? key.getEncoded() : null);
+  }
+  
+  public DelegationKey(int keyId, long expiryDate, byte[] encodedKey) {
     this.keyId = keyId;
     this.expiryDate = expiryDate;
-    if (key!=null) {
-      this.keyBytes = key.getEncoded();
+    if (encodedKey != null) {
+      this.keyBytes = encodedKey;
     }
   }
 
@@ -70,6 +75,10 @@ public class DelegationKey implements Wr
       return key;
     }
   }
+  
+  public byte[] getEncodedKey() {
+    return keyBytes;
+  }
 
   public void setExpiryDate(long expiryDate) {
     this.expiryDate = expiryDate;

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java Mon Feb 27 03:58:10 2012
@@ -94,7 +94,7 @@ public abstract class GetGroupsBase exte
    * @return A {@link GetUserMappingsProtocol} client proxy.
    * @throws IOException
    */
-  private GetUserMappingsProtocol getUgmProtocol() throws IOException {
+  protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
     GetUserMappingsProtocol userGroupMappingProtocol =
       RPC.getProxy(GetUserMappingsProtocol.class, 
           GetUserMappingsProtocol.versionID,

Propchange: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 27 03:58:10 2012
@@ -1,3 +1,4 @@
+/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/core:1227776-1294004
 /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1166009,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1182189,1182205,1182214,1183132,1189613,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204370,1204376,1204388,1205260,1205697,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227964,1229347,1230398,1231569,1231572,1231627,1231640,1233605,1234555,1235135,1235137,1235956,1236456,1239752,1240897,1240928,1243065,1243104,1244766,1245751,1245762,1293419
 /hadoop/core/branches/branch-0.19/core/src/test/core:713112
 /hadoop/core/trunk/src/test/core:776175-785643,785929-786278

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Mon Feb 27 03:58:10 2012
@@ -57,6 +57,11 @@ public class TestFailoverProxy {
     public Class<?> getInterface() {
       return iface;
     }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing to do.
+    }
     
   }
   

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Feb 27 03:58:10 2012
@@ -23,6 +23,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 
@@ -96,8 +97,8 @@ public class TestIPC {
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
-        throws IOException {
+    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
       if (sleep) {
         // sleep a bit
         try {

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Mon Feb 27 03:58:10 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 
 /**
@@ -72,8 +73,8 @@ public class TestIPCServerResponder exte
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
-        throws IOException {
+    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
       if (sleep) {
         try {
           Thread.sleep(RANDOM.nextInt(20)); // sleep a bit

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Feb 27 03:58:10 2012
@@ -18,28 +18,39 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Arrays;
 
-import junit.framework.TestCase;
+import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
@@ -49,18 +60,22 @@ import static org.apache.hadoop.test.Met
 import static org.mockito.Mockito.*;
 
 /** Unit tests for RPC. */
-public class TestRPC extends TestCase {
+@SuppressWarnings("deprecation")
+public class TestRPC {
   private static final String ADDRESS = "0.0.0.0";
 
   public static final Log LOG =
     LogFactory.getLog(TestRPC.class);
   
   private static Configuration conf = new Configuration();
+  
+  static {
+    conf.setClass("rpc.engine." + StoppedProtocol.class.getName(),
+        StoppedRpcEngine.class, RpcEngine.class);
+  }
 
   int datasize = 1024*100;
   int numThreads = 50;
-
-  public TestRPC(String name) { super(name); }
 	
   public interface TestProtocol extends VersionedProtocol {
     public static final long versionID = 1L;
@@ -207,6 +222,80 @@ public class TestRPC extends TestCase {
     }
   }
   
+  /**
+   * A basic interface for testing client-side RPC resource cleanup.
+   */
+  private static interface StoppedProtocol {
+    long versionID = 0;
+
+    public void stop();
+  }
+  
+  /**
+   * A class used for testing cleanup of client side RPC resources.
+   */
+  private static class StoppedRpcEngine implements RpcEngine {
+
+    @Override
+    public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
+        UserGroupInformation ticket, Configuration conf)
+        throws IOException, InterruptedException {
+      return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+        SocketFactory factory, int rpcTimeout) throws IOException {
+      T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+              new Class[] { protocol }, new StoppedInvocationHandler());
+      return new ProtocolProxy<T>(protocol, proxy, false);
+    }
+
+    @Override
+    public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
+        Object instance, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+      return null;
+    }
+
+    @Override
+    public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+        ConnectionId connId, Configuration conf, SocketFactory factory)
+        throws IOException {
+      throw new UnsupportedOperationException("This proxy is not supported");
+    }
+  }
+
+  /**
+   * An invocation handler which does nothing when invoking methods, and just
+   * counts the number of times close() is called.
+   */
+  private static class StoppedInvocationHandler
+      implements InvocationHandler, Closeable {
+    
+    private int closeCalled = 0;
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+          return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeCalled++;
+    }
+    
+    public int getCloseCalled() {
+      return closeCalled;
+    }
+    
+  }
+  
+  @Test
   public void testConfRpc() throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, 1, false, conf, null);
@@ -229,6 +318,7 @@ public class TestRPC extends TestCase {
     server.stop();    
   }
 
+  @Test
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
     // create a server with two handlers
@@ -273,11 +363,12 @@ public class TestRPC extends TestCase {
     }
   }
   
-  public void testRPCConf(Configuration conf) throws Exception {
-    
+  @Test
+  public void testCalls() throws Exception {
+    testCallsInternal(conf);
   }
-
-  public void testCalls(Configuration conf) throws Exception {
+  
+  private void testCallsInternal(Configuration conf) throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, conf);
     TestProtocol proxy = null;
@@ -384,6 +475,7 @@ public class TestRPC extends TestCase {
     }
   }
   
+  @Test
   public void testStandaloneClient() throws IOException {
     try {
       TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
@@ -450,6 +542,7 @@ public class TestRPC extends TestCase {
     }
   }
   
+  @Test
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
@@ -481,20 +574,48 @@ public class TestRPC extends TestCase {
     Configuration conf = new Configuration();
     
     conf.setBoolean("ipc.client.ping", false);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
     
     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
   }
 
   /**
    * Test stopping a non-registered proxy
    * @throws Exception
    */
+  @Test
   public void testStopNonRegisteredProxy() throws Exception {
     RPC.stopProxy(mock(TestProtocol.class));
   }
   
+  @Test
+  public void testStopProxy() throws IOException {
+    StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
+  public void testWrappedStopProxy() throws IOException {
+    StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(wrappedProxy);
+    
+    StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class,
+        wrappedProxy, RetryPolicies.RETRY_FOREVER);
+    
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
   public void testErrorMsgForInsecureClient() throws Exception {
     final Server server = RPC.getServer(TestProtocol.class,
         new TestImpl(), ADDRESS, 0, 5, true, conf, null);
@@ -567,10 +688,10 @@ public class TestRPC extends TestCase {
     return count;
   }
 
-
   /**
    * Test that server.stop() properly stops all threads
    */
+  @Test
   public void testStopsAllThreads() throws Exception {
     int threadsBefore = countThreads("Server$Listener$Reader");
     assertEquals("Expect no Reader threads running before test",
@@ -591,8 +712,7 @@ public class TestRPC extends TestCase {
   }
   
   public static void main(String[] args) throws Exception {
-
-    new TestRPC("test").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
 
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Mon Feb 27 03:58:10 2012
@@ -31,6 +31,10 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
 import org.apache.hadoop.net.NetUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -39,7 +43,7 @@ import org.junit.Test;
 public class TestRPCCompatibility {
   private static final String ADDRESS = "0.0.0.0";
   private static InetSocketAddress addr;
-  private static Server server;
+  private static RPC.Server server;
   private ProtocolProxy<?> proxy;
 
   public static final Log LOG =
@@ -52,10 +56,14 @@ public class TestRPCCompatibility {
     void ping() throws IOException;    
   }
   
-  public interface TestProtocol1 extends TestProtocol0 {
+  public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 {
     String echo(String value) throws IOException;
   }
 
+  
+  // TestProtocol2 is a compatible impl of TestProtocol1 - hence use its name
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol2 extends TestProtocol1 {
     int echo(int value)  throws IOException;
   }
@@ -89,28 +97,44 @@ public class TestRPCCompatibility {
   public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
     @Override
     public String echo(String value) { return value; }
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+        return TestProtocol1.versionID;
+    }
   }
 
   public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
     @Override
     public int echo(int value) { return value; }
+
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+      return TestProtocol2.versionID;
+    }
+
   }
   
   @After
   public void tearDown() throws IOException {
     if (proxy != null) {
       RPC.stopProxy(proxy.getProxy());
+      proxy = null;
     }
     if (server != null) {
       server.stop();
+      server = null;
     }
   }
   
   @Test  // old client vs new server
   public void testVersion0ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                            impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -154,8 +178,10 @@ public class TestRPCCompatibility {
     
     public int echo(int value) throws IOException, NumberFormatException {
       if (serverInfo.isMethodSupported("echo", int.class)) {
+System.out.println("echo int is supported");
         return -value;  // use version 3 echo long
       } else { // server is version 2
+System.out.println("echo int is NOT supported");
         return Integer.parseInt(proxy2.echo(String.valueOf(value)));
       }
     }
@@ -172,8 +198,10 @@ public class TestRPCCompatibility {
   @Test // Compatible new client & old server
   public void testVersion2ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                              impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -189,9 +217,12 @@ public class TestRPCCompatibility {
   
   @Test // equal version client and server
   public void testVersion2ClientVersion2Server() throws Exception {
+    ProtocolSignature.resetCache();
     // create a server with two handlers
+    TestImpl2 impl = new TestImpl2();
     server = RPC.getServer(TestProtocol2.class,
-                              new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
+                             impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -250,14 +281,16 @@ public class TestRPCCompatibility {
     assertEquals(hash1, hash2);
   }
   
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol4 extends TestProtocol2 {
-    public static final long versionID = 1L;
+    public static final long versionID = 4L;
     int echo(int value)  throws IOException;
   }
   
   @Test
   public void testVersionMismatch() throws IOException {
-    server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+    server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
         false, conf, null);
     server.start();
     addr = NetUtils.getConnectAddress(server);
@@ -268,7 +301,76 @@ public class TestRPCCompatibility {
       proxy.echo(21);
       fail("The call must throw VersionMismatch exception");
     } catch (IOException ex) {
-      Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+      Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), 
+          ex.getMessage().contains("VersionMismatch"));
+    }
+  }
+  
+  @Test
+  public void testIsMethodSupported() throws IOException {
+    server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
+        false, conf, null);
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+    TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
+        TestProtocol2.versionID, addr, conf);
+    boolean supported = RpcClientUtil.isMethodSupported(proxy,
+        TestProtocol2.class, RpcKind.RPC_WRITABLE,
+        RPC.getProtocolVersion(TestProtocol2.class), "echo");
+    Assert.assertTrue(supported);
+    supported = RpcClientUtil.isMethodSupported(proxy,
+        TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(TestProtocol2.class), "echo");
+    Assert.assertFalse(supported);
+  }
+
+  /**
+   * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
+   * the server registry to extract protocol signatures and versions.
+   */
+  @Test
+  public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
+    TestImpl1 impl = new TestImpl1();
+    server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
+        conf, null);
+    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.start();
+
+    ProtocolMetaInfoServerSideTranslatorPB xlator = 
+        new ProtocolMetaInfoServerSideTranslatorPB(server);
+
+    GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
+        null,
+        createGetProtocolSigRequestProto(TestProtocol1.class,
+            RpcKind.RPC_PROTOCOL_BUFFER));
+    //No signatures should be found
+    Assert.assertEquals(0, resp.getProtocolSignatureCount());
+    resp = xlator.getProtocolSignature(
+        null,
+        createGetProtocolSigRequestProto(TestProtocol1.class,
+            RpcKind.RPC_WRITABLE));
+    Assert.assertEquals(1, resp.getProtocolSignatureCount());
+    ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
+    Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
+    boolean found = false;
+    int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
+        .getMethod("echo", String.class));
+    for (int m : sig.getMethodsList()) {
+      if (expected == m) {
+        found = true;
+        break;
+      }
     }
+    Assert.assertTrue(found);
+  }
+  
+  private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
+      Class<?> protocol, RpcKind rpcKind) {
+    GetProtocolSignatureRequestProto.Builder builder = 
+        GetProtocolSignatureRequestProto.newBuilder();
+    builder.setProtocol(protocol.getName());
+    builder.setRpcKind(rpcKind.toString());
+    return builder.build();
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1294017&r1=1294016&r2=1294017&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java Mon Feb 27 03:58:10 2012
@@ -164,6 +164,10 @@ public abstract class MultithreadedTestU
       }
       checkException();
     }
+
+    public Iterable<? extends Thread> getTestThreads() {
+      return testThreads;
+    }
   }
 
   /**