You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sr...@apache.org on 2012/04/23 18:34:23 UTC

svn commit: r1329319 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ dev-support/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/util/ src/main/proto/ src/test/java/org/apache/hadoop/ipc/

Author: sradia
Date: Mon Apr 23 16:34:21 2012
New Revision: 1329319

URL: http://svn.apache.org/viewvc?rev=1329319&view=rev
Log:
    HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia)

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
Removed:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Mon Apr 23 16:34:21 2012
@@ -63,6 +63,8 @@ Trunk (unreleased changes)
 
     HADOOP-8290. Remove remaining references to hadoop.native.lib (harsh)
 
+    HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia)
+
   BUG FIXES
 
     HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml Mon Apr 23 16:34:21 2012
@@ -282,8 +282,13 @@
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
     </Match>
+        <Match>
+      <!-- protobuf generated code -->
+      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcPayloadHeaderProtos.*"/>
+    </Match>
     <Match>
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
     </Match>
+
  </FindBugsFilter>

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Apr 23 16:34:21 2012
@@ -50,8 +50,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.ipc.RpcPayloadHeader.*;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -163,10 +164,10 @@ public class Client {
     final Writable rpcRequest;  // the serialized rpc request - RpcPayload
     Writable rpcResponse;       // null if rpc has error
     IOException error;          // exception, null if success
-    final RpcKind rpcKind;      // Rpc EngineKind
+    final RPC.RpcKind rpcKind;      // Rpc EngineKind
     boolean done;               // true when call is done
 
-    protected Call(RpcKind rpcKind, Writable param) {
+    protected Call(RPC.RpcKind rpcKind, Writable param) {
       this.rpcKind = rpcKind;
       this.rpcRequest = param;
       synchronized (Client.this) {
@@ -613,7 +614,7 @@ public class Client {
             this.in = new DataInputStream(new BufferedInputStream(inStream));
           }
           this.out = new DataOutputStream(new BufferedOutputStream(outStream));
-          writeHeader();
+          writeConnectionContext();
 
           // update last activity time
           touch();
@@ -704,16 +705,17 @@ public class Client {
       out.flush();
     }
     
-    /* Write the protocol header for each connection
+    /* Write the connection context header for each connection
      * Out is not synchronized because only the first thread does this.
      */
-    private void writeHeader() throws IOException {
+    private void writeConnectionContext() throws IOException {
       // Write out the ConnectionHeader
       DataOutputBuffer buf = new DataOutputBuffer();
       connectionContext.writeTo(buf);
       
       // Write out the payload length
       int bufLen = buf.getLength();
+
       out.writeInt(bufLen);
       out.write(buf.getData(), 0, bufLen);
     }
@@ -806,21 +808,22 @@ public class Client {
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " sending #" + call.id);
           
-          //for serializing the
-          //data to be written
+          // Serializing the data to be written.
+          // Format:
+          // 0) Length of rest below (1 + 2)
+          // 1) PayloadHeader  - is serialized Delimited hence contains length
+          // 2) the Payload - the RpcRequest
+          //
           d = new DataOutputBuffer();
-          d.writeInt(0); // placeholder for data length
-          RpcPayloadHeader header = new RpcPayloadHeader(
-              call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id);
-          header.write(d);
+          RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
+             call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
+          header.writeDelimitedTo(d);
           call.rpcRequest.write(d);
           byte[] data = d.getData();
-          int dataLength = d.getLength() - 4;
-          data[0] = (byte)((dataLength >>> 24) & 0xff);
-          data[1] = (byte)((dataLength >>> 16) & 0xff);
-          data[2] = (byte)((dataLength >>> 8) & 0xff);
-          data[3] = (byte)(dataLength & 0xff);
-          out.write(data, 0, dataLength + 4);//write the data
+   
+          int totalLength = d.getLength();
+          out.writeInt(totalLength); // Total Length
+          out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
           out.flush();
         }
       } catch(IOException e) {
@@ -937,7 +940,7 @@ public class Client {
     private int index;
     
     public ParallelCall(Writable param, ParallelResults results, int index) {
-      super(RpcKind.RPC_WRITABLE, param);
+      super(RPC.RpcKind.RPC_WRITABLE, param);
       this.results = results;
       this.index = index;
     }
@@ -1022,22 +1025,22 @@ public class Client {
   }
 
   /**
-   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)}
    *  for RPC_BUILTIN
    */
   public Writable call(Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
-    return call(RpcKind.RPC_BUILTIN, param, address);
+    return call(RPC.RpcKind.RPC_BUILTIN, param, address);
     
   }
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   * @deprecated Use {@link #call(RPC.RpcKind, Writable,
    *  ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
+  public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
       return call(rpcKind, param, address, null);
   }
@@ -1047,11 +1050,11 @@ public class Client {
    * the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
-   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, 
+   * @deprecated Use {@link #call(RPC.RpcKind, Writable, 
    * ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
+  public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, 
       UserGroupInformation ticket)  
       throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
@@ -1065,11 +1068,11 @@ public class Client {
    * timeout, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
-   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   * @deprecated Use {@link #call(RPC.RpcKind, Writable,
    *  ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
+  public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
                        int rpcTimeout)  
                        throws InterruptedException, IOException {
@@ -1080,7 +1083,7 @@ public class Client {
 
   
   /**
-   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, 
+   * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, 
    * Class, UserGroupInformation, int, Configuration)}
    * except that rpcKind is writable.
    */
@@ -1090,7 +1093,7 @@ public class Client {
       throws InterruptedException, IOException {
         ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-    return call(RpcKind.RPC_BUILTIN, param, remoteId);
+    return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
   /**
@@ -1101,7 +1104,7 @@ public class Client {
    * value. Throws exceptions if there are network problems or if the remote
    * code threw an exception.
    */
-  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
+  public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
                        int rpcTimeout, Configuration conf)  
                        throws InterruptedException, IOException {
@@ -1111,12 +1114,12 @@ public class Client {
   }
   
   /**
-   * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
    * except the rpcKind is RPC_BUILTIN
    */
   public Writable call(Writable param, ConnectionId remoteId)  
       throws InterruptedException, IOException {
-     return call(RpcKind.RPC_BUILTIN, param, remoteId);
+     return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
   /** 
@@ -1130,7 +1133,7 @@ public class Client {
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
    */
-  public Writable call(RpcKind rpcKind, Writable rpcRequest,
+  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
       ConnectionId remoteId) throws InterruptedException, IOException {
     Call call = new Call(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Mon Apr 23 16:34:21 2012
@@ -38,7 +38,6 @@ import org.apache.hadoop.io.DataOutputOu
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -61,7 +60,7 @@ public class ProtobufRpcEngine implement
   
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
-        RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
+        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
         new Server.ProtoBufRpcInvoker());
   }
 
@@ -182,7 +181,7 @@ public class ProtobufRpcEngine implement
       HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
       RpcResponseWritable val = null;
       try {
-        val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER,
+        val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
             new RpcRequestWritable(rpcRequest), remoteId);
       } catch (Throwable e) {
         throw new ServiceException(e);
@@ -351,7 +350,7 @@ public class ProtobufRpcEngine implement
           numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
               .getClass().getName()), secretManager, portRangeConfig);
       this.verbose = verbose;  
-      registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
+      registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
           protocolImpl);
     }
     
@@ -363,10 +362,10 @@ public class ProtobufRpcEngine implement
           String protoName, long version) throws IOException {
         ProtoNameVer pv = new ProtoNameVer(protoName, version);
         ProtoClassProtoImpl impl = 
-            server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+            server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
         if (impl == null) { // no match for Protocol AND Version
           VerProtocolImpl highest = 
-              server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, 
+              server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, 
                   protoName);
           if (highest == null) {
             throw new IOException("Unknown protocol: " + protoName);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java Mon Apr 23 16:34:21 2012
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ipc;
 
 import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto;
@@ -49,7 +48,7 @@ public class ProtocolMetaInfoServerSideT
     String protocol = request.getProtocol();
     GetProtocolVersionsResponseProto.Builder builder = 
         GetProtocolVersionsResponseProto.newBuilder();
-    for (RpcKind r : RpcKind.values()) {
+    for (RPC.RpcKind r : RPC.RpcKind.values()) {
       long[] versions;
       try {
         versions = getProtocolVersionForRpcKind(r, protocol);
@@ -78,7 +77,7 @@ public class ProtocolMetaInfoServerSideT
     String rpcKind = request.getRpcKind();
     long[] versions;
     try {
-      versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind),
+      versions = getProtocolVersionForRpcKind(RPC.RpcKind.valueOf(rpcKind),
           protocol);
     } catch (ClassNotFoundException e1) {
       throw new ServiceException(e1);
@@ -104,7 +103,7 @@ public class ProtocolMetaInfoServerSideT
     return builder.build();
   }
   
-  private long[] getProtocolVersionForRpcKind(RpcKind rpcKind,
+  private long[] getProtocolVersionForRpcKind(RPC.RpcKind rpcKind,
       String protocol) throws ClassNotFoundException {
     Class<?> protocolClass = Class.forName(protocol);
     String protocolName = RPC.getProtocolName(protocolClass);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Mon Apr 23 16:34:21 2012
@@ -42,7 +42,6 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.Client.ConnectionId;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -73,6 +72,18 @@ import com.google.protobuf.BlockingServi
  * the protocol instance is transmitted.
  */
 public class RPC {
+  public enum RpcKind {
+    RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
+    RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
+    RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
+    final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
+    private static final short FIRST_INDEX = RPC_BUILTIN.value;    
+    public final short value; //TODO make it private
+
+    RpcKind(short val) {
+      this.value = val;
+    } 
+  }
   
   interface RpcInvoker {   
     /**
@@ -777,7 +788,7 @@ public class RPC {
    ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = 
        new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
    
-   Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+   Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RPC.RpcKind rpcKind) {
      if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
        for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
          protocolImplMapArray.add(
@@ -821,7 +832,7 @@ public class RPC {
    
    
    @SuppressWarnings("unused") // will be useful later.
-   VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+   VerProtocolImpl[] getSupportedProtocolVersions(RPC.RpcKind rpcKind,
        String protocolName) {
      VerProtocolImpl[] resultk = 
          new  VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
@@ -900,7 +911,7 @@ public class RPC {
     }
     
     @Override
-    public Writable call(RpcKind rpcKind, String protocol,
+    public Writable call(RPC.RpcKind rpcKind, String protocol,
         Writable rpcRequest, long receiveTime) throws Exception {
       return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
           receiveTime);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java Mon Apr 23 16:34:21 2012
@@ -27,7 +27,6 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
@@ -107,7 +106,7 @@ public class RpcClientUtil {
    * @throws IOException
    */
   public static boolean isMethodSupported(Object rpcProxy, Class<?> protocol,
-      RpcKind rpcKind, long version, String methodName) throws IOException {
+      RPC.RpcKind rpcKind, long version, String methodName) throws IOException {
     InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy);
     Map<Long, ProtocolSignature> versionMap = getVersionSignatureMap(
         serverAddress, protocol.getName(), rpcKind.toString());

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Apr 23 16:34:21 2012
@@ -72,11 +72,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -170,8 +169,8 @@ public abstract class Server {
       this.rpcRequestWrapperClass = rpcRequestWrapperClass;
     }   
   }
-  static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
-      HashMap<RpcKind, RpcKindMapValue>(4);
+  static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RPC.RpcKind, RpcKindMapValue>(4);
   
   
 
@@ -185,7 +184,7 @@ public abstract class Server {
    *  @param rpcInvoker - use to process the calls on SS.
    */
   
-  public static void registerProtocolEngine(RpcKind rpcKind, 
+  public static void registerProtocolEngine(RPC.RpcKind rpcKind, 
           Class<? extends Writable> rpcRequestWrapperClass,
           RpcInvoker rpcInvoker) {
     RpcKindMapValue  old = 
@@ -201,14 +200,14 @@ public abstract class Server {
   }
   
   public Class<? extends Writable> getRpcRequestWrapper(
-      RpcKind rpcKind) {
+      RpcKindProto rpcKind) {
     if (rpcRequestClass != null)
        return rpcRequestClass;
-    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    RpcKindMapValue val = rpcKindMap.get(ProtoUtil.convert(rpcKind));
     return (val == null) ? null : val.rpcRequestWrapperClass; 
   }
   
-  public static RpcInvoker  getRpcInvoker(RpcKind rpcKind) {
+  public static RpcInvoker  getRpcInvoker(RPC.RpcKind rpcKind) {
     RpcKindMapValue val = rpcKindMap.get(rpcKind);
     return (val == null) ? null : val.rpcInvoker; 
   }
@@ -403,12 +402,12 @@ public abstract class Server {
     private long timestamp;               // time received when response is null
                                           // time served when response is not null
     private ByteBuffer rpcResponse;       // the response for this call
-    private final RpcKind rpcKind;
+    private final RPC.RpcKind rpcKind;
 
     public Call(int id, Writable param, Connection connection) {
-      this( id,  param,  connection, RpcKind.RPC_BUILTIN );    
+      this( id,  param,  connection, RPC.RpcKind.RPC_BUILTIN );    
     }
-    public Call(int id, Writable param, Connection connection, RpcKind kind) { 
+    public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) { 
       this.callId = id;
       this.rpcRequest = param;
       this.connection = connection;
@@ -1366,7 +1365,6 @@ public abstract class Server {
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
-       
           if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) {
             // covers the !useSasl too
             dataLengthBuffer.clear();
@@ -1555,22 +1553,27 @@ public abstract class Server {
     private void processData(byte[] buf) throws  IOException, InterruptedException {
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
-      RpcPayloadHeader header = new RpcPayloadHeader();
-      header.readFields(dis);           // Read the RpcPayload header
+      RpcPayloadHeaderProto header = RpcPayloadHeaderProto.parseDelimitedFrom(dis);
         
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + header.getCallId());
-      if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
+      if (!header.hasRpcOp()) {
+        throw new IOException(" IPC Server: No rpc op in rpcPayloadHeader");
+      }
+      if (header.getRpcOp() != RpcPayloadOperationProto.RPC_FINAL_PAYLOAD) {
         throw new IOException("IPC Server does not implement operation" + 
-              header.getOperation());
+              header.getRpcOp());
       }
       // If we know the rpc kind, get its class so that we can deserialize
       // (Note it would make more sense to have the handler deserialize but 
       // we continue with this original design.
+      if (!header.hasRpcKind()) {
+        throw new IOException(" IPC Server: No rpc kind in rpcPayloadHeader");
+      }
       Class<? extends Writable> rpcRequestClass = 
-          getRpcRequestWrapper(header.getkind());
+          getRpcRequestWrapper(header.getRpcKind());
       if (rpcRequestClass == null) {
-        LOG.warn("Unknown rpc kind "  + header.getkind() + 
+        LOG.warn("Unknown rpc kind "  + header.getRpcKind() + 
             " from client " + getHostAddress());
         final Call readParamsFailedCall = 
             new Call(header.getCallId(), null, this);
@@ -1578,7 +1581,7 @@ public abstract class Server {
 
         setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
             IOException.class.getName(),
-            "Unknown rpc kind "  + header.getkind());
+            "Unknown rpc kind "  + header.getRpcKind());
         responder.doRespond(readParamsFailedCall);
         return;   
       }
@@ -1589,7 +1592,7 @@ public abstract class Server {
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress() + "on connection protocol " +
-            this.protocolName + " for rpcKind " + header.getkind(),  t);
+            this.protocolName + " for rpcKind " + header.getRpcKind(),  t);
         final Call readParamsFailedCall = 
             new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@@ -1601,7 +1604,8 @@ public abstract class Server {
         return;
       }
         
-      Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
+      Call call = new Call(header.getCallId(), rpcRequest, this, 
+          ProtoUtil.convert(header.getRpcKind()));
       callQueue.put(call);              // queue the call; maybe blocked here
       incRpcCount();  // Increment the rpc count
     }
@@ -1991,11 +1995,11 @@ public abstract class Server {
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws Exception {
-    return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
+    return call(RPC.RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   
   /** Called for each call. */
-  public abstract Writable call(RpcKind rpcKind, String protocol,
+  public abstract Writable call(RPC.RpcKind rpcKind, String protocol,
       Writable param, long receiveTime) throws Exception;
   
   /**

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon Apr 23 16:34:21 2012
@@ -33,7 +33,6 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -75,7 +74,7 @@ public class WritableRpcEngine implement
    * Register the rpcRequest deserializer for WritableRpcEngine
    */
   private static synchronized void initialize() {
-    org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE,
         Invocation.class, new Server.WritableRpcInvoker());
     isInitialized = true;
   }
@@ -223,7 +222,7 @@ public class WritableRpcEngine implement
       }
 
       ObjectWritable value = (ObjectWritable)
-        client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
+        client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
       if (LOG.isDebugEnabled()) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -412,12 +411,12 @@ public class WritableRpcEngine implement
               protocolImpl.getClass());
         }
         // register protocol class and its super interfaces
-        registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+        registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
         protocols = RPC.getProtocolInterfaces(protocolClass);
       }
       for (Class<?> p : protocols) {
         if (!p.equals(VersionedProtocol.class)) {
-          registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
+          registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
         }
       }
 
@@ -461,7 +460,7 @@ public class WritableRpcEngine implement
             // registered directly.
             // Send the call to the highest  protocol version
             VerProtocolImpl highest = server.getHighestSupportedProtocol(
-                RpcKind.RPC_WRITABLE, protocolName);
+                RPC.RpcKind.RPC_WRITABLE, protocolName);
             if (highest == null) {
               throw new IOException("Unknown protocol: " + protocolName);
             }
@@ -473,10 +472,10 @@ public class WritableRpcEngine implement
             ProtoNameVer pv = 
                 new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
             protocolImpl = 
-                server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+                server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
             if (protocolImpl == null) { // no match for Protocol AND Version
                VerProtocolImpl highest = 
-                   server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, 
+                   server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, 
                        protoName);
               if (highest == null) {
                 throw new IOException("Unknown protocol: " + protoName);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java Mon Apr 23 16:34:21 2012
@@ -21,8 +21,10 @@ package org.apache.hadoop.util;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -135,4 +137,30 @@ public abstract class ProtoUtil {
     }
     return ugi;
   }
+  
+  static RpcKindProto convert(RPC.RpcKind kind) {
+    switch (kind) {
+    case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN;
+    case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE;
+    case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER;
+    }
+    return null;
+  }
+  
+  
+  public static RPC.RpcKind convert( RpcKindProto kind) {
+    switch (kind) {
+    case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN;
+    case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE;
+    case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER;
+    }
+    return null;
+  }
+ 
+  public static RpcPayloadHeaderProto makeRpcPayloadHeader(RPC.RpcKind rpcKind,
+      RpcPayloadOperationProto operation, int callId) {
+    RpcPayloadHeaderProto.Builder result = RpcPayloadHeaderProto.newBuilder();
+    result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId);
+    return result.build();
+  }
 }

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto?rev=1329319&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto Mon Apr 23 16:34:21 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.ipc.protobuf";
+option java_outer_classname = "RpcPayloadHeaderProtos";
+option java_generate_equals_and_hash = true;
+
+
+/**
+ * This is the rpc payload header. It is sent with every rpc call.
+ * 
+ * The format of RPC call is as follows:
+ * +-----------------------------------------------------+
+ * |  Rpc length in bytes                                |
+ * +-----------------------------------------------------+
+ * | RpcPayloadHeader - serialized delimited ie has len  |
+ * +-----------------------------------------------------+
+ * |  RpcRequest Payload                                 |
+ * +-----------------------------------------------------+
+ *
+ */
+
+
+
+/**
+ * RpcKind determine the rpcEngine and the serialization of the rpc payload
+ */
+enum RpcKindProto {
+  RPC_BUILTIN          = 0;  // Used for built in calls by tests
+  RPC_WRITABLE         = 1;  // Use WritableRpcEngine 
+  RPC_PROTOCOL_BUFFER  = 2;  // Use ProtobufRpcEngine
+}
+
+enum RpcPayloadOperationProto {
+  RPC_FINAL_PAYLOAD        = 0; // The final payload
+  RPC_CONTINUATION_PAYLOAD = 1; // not implemented yet
+  RPC_CLOSE_CONNECTION     = 2; // close the rpc connection
+}
+   
+message RpcPayloadHeaderProto { // the header for the RpcRequest
+  optional RpcKindProto rpcKind = 1;
+  optional RpcPayloadOperationProto rpcOp = 2;
+  optional uint32 callId = 3; // each rpc has a callId that is also used in response
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Apr 23 16:34:21 2012
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 
@@ -99,7 +98,7 @@ public class TestIPC {
     }
 
     @Override
-    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+    public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
         long receiveTime) throws IOException {
       if (sleep) {
         // sleep a bit

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Mon Apr 23 16:34:21 2012
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 
 /**
@@ -73,7 +72,7 @@ public class TestIPCServerResponder exte
     }
 
     @Override
-    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+    public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
         long receiveTime) throws IOException {
       if (sleep) {
         try {

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java Mon Apr 23 16:34:21 2012
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
 import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
@@ -178,9 +177,9 @@ public class TestMultipleProtocolServer 
     // create a server with two handlers
     server = RPC.getServer(Foo0.class,
                               new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
-    server.addProtocol(RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
-    server.addProtocol(RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
     
     
     // Add Protobuf server
@@ -189,7 +188,7 @@ public class TestMultipleProtocolServer 
         new PBServerImpl();
     BlockingService service = TestProtobufRpcProto
         .newReflectiveBlockingService(pbServerImpl);
-    server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
+    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
         service);
     server.start();
     addr = NetUtils.getConnectAddress(server);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java Mon Apr 23 16:34:21 2012
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
@@ -122,7 +121,7 @@ public class TestProtoBufRpc {
     BlockingService service2 = TestProtobufRpc2Proto
         .newReflectiveBlockingService(server2Impl);
     
-    server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
+    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
         service2);
     server.start();
   }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1329319&r1=1329318&r2=1329319&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Mon Apr 23 16:34:21 2012
@@ -31,7 +31,6 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
@@ -134,7 +133,7 @@ public class TestRPCCompatibility {
     TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
                             impl, ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -201,7 +200,7 @@ System.out.println("echo int is NOT supp
     TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
                               impl, ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -222,7 +221,7 @@ System.out.println("echo int is NOT supp
     TestImpl2 impl = new TestImpl2();
     server = RPC.getServer(TestProtocol2.class,
                              impl, ADDRESS, 0, 2, false, conf, null);
-    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -316,11 +315,11 @@ System.out.println("echo int is NOT supp
     TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
         TestProtocol2.versionID, addr, conf);
     boolean supported = RpcClientUtil.isMethodSupported(proxy,
-        TestProtocol2.class, RpcKind.RPC_WRITABLE,
+        TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE,
         RPC.getProtocolVersion(TestProtocol2.class), "echo");
     Assert.assertTrue(supported);
     supported = RpcClientUtil.isMethodSupported(proxy,
-        TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER,
+        TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
         RPC.getProtocolVersion(TestProtocol2.class), "echo");
     Assert.assertFalse(supported);
   }
@@ -334,7 +333,7 @@ System.out.println("echo int is NOT supp
     TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
         conf, null);
-    server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
     server.start();
 
     ProtocolMetaInfoServerSideTranslatorPB xlator = 
@@ -343,13 +342,13 @@ System.out.println("echo int is NOT supp
     GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
         null,
         createGetProtocolSigRequestProto(TestProtocol1.class,
-            RpcKind.RPC_PROTOCOL_BUFFER));
+            RPC.RpcKind.RPC_PROTOCOL_BUFFER));
     //No signatures should be found
     Assert.assertEquals(0, resp.getProtocolSignatureCount());
     resp = xlator.getProtocolSignature(
         null,
         createGetProtocolSigRequestProto(TestProtocol1.class,
-            RpcKind.RPC_WRITABLE));
+            RPC.RpcKind.RPC_WRITABLE));
     Assert.assertEquals(1, resp.getProtocolSignatureCount());
     ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
     Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
@@ -366,7 +365,7 @@ System.out.println("echo int is NOT supp
   }
   
   private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
-      Class<?> protocol, RpcKind rpcKind) {
+      Class<?> protocol, RPC.RpcKind rpcKind) {
     GetProtocolSignatureRequestProto.Builder builder = 
         GetProtocolSignatureRequestProto.newBuilder();
     builder.setProtocol(protocol.getName());