You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/01/11 08:04:07 UTC

svn commit: r1229909 - in /hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common: ./ src/main/java/ src/main/java/org/apache/hadoop/ipc/

Author: szetszwo
Date: Wed Jan 11 07:04:06 2012
New Revision: 1229909

URL: http://svn.apache.org/viewvc?rev=1229909&view=rev
Log:
svn merge -c 1197885 from trunk for HADOOP-7776.

Added:
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
      - copied unchanged from r1197885, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
Modified:
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/   (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 11 07:04:06 2012
@@ -1 +1 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt Wed Jan 11 07:04:06 2012
@@ -21,6 +21,8 @@ Release 0.23-PB - Unreleased
     HADOOP-7716 RPC protocol registration on SS does not log the protocol name
     (only the class which may be different) (sanjay)
 
+    HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
+
 Release 0.23.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 11 07:04:06 2012
@@ -1,5 +1,5 @@
 /hadoop/common/branches/yahoo-merge/CHANGES.txt:1079157,1079163-1079164,1079167
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
 /hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 11 07:04:06 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
 /hadoop/core/branches/branch-0.19/core/src/java:713112
 /hadoop/core/trunk/src/core:776175-785643,785929-786278

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Wed Jan 11 07:04:06 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RpcPayloadHeader.*;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -153,16 +154,20 @@ public class Client {
     return refCount==0;
   }
 
-  /** A call waiting for a value. */
+  /** 
+   * Class that represents an RPC call
+   */
   private class Call {
-    int id;                                       // call id
-    Writable param;                               // parameter
-    Writable value;                               // value, null if error
-    IOException error;                            // exception, null if value
-    boolean done;                                 // true when call is done
-
-    protected Call(Writable param) {
-      this.param = param;
+    final int id;               // call id
+    final Writable rpcRequest;  // the serialized rpc request - RpcPayload
+    Writable rpcResponse;       // null if rpc has error
+    IOException error;          // exception, null if success
+    final RpcKind rpcKind;      // Rpc EngineKind
+    boolean done;               // true when call is done
+
+    protected Call(RpcKind rpcKind, Writable param) {
+      this.rpcKind = rpcKind;
+      this.rpcRequest = param;
       synchronized (Client.this) {
         this.id = counter++;
       }
@@ -188,15 +193,15 @@ public class Client {
     /** Set the return value when there is no error. 
      * Notify the caller the call is done.
      * 
-     * @param value return value of the call.
+     * @param rpcResponse return value of the rpc call.
      */
-    public synchronized void setValue(Writable value) {
-      this.value = value;
+    public synchronized void setRpcResponse(Writable rpcResponse) {
+      this.rpcResponse = rpcResponse;
       callComplete();
     }
     
-    public synchronized Writable getValue() {
-      return value;
+    public synchronized Writable getRpcResult() {
+      return rpcResponse;
     }
   }
 
@@ -728,6 +733,7 @@ public class Client {
       }
     }
 
+    @SuppressWarnings("unused")
     public InetSocketAddress getRemoteAddress() {
       return server;
     }
@@ -788,8 +794,10 @@ public class Client {
           //for serializing the
           //data to be written
           d = new DataOutputBuffer();
-          d.writeInt(call.id);
-          call.param.write(d);
+          RpcPayloadHeader header = new RpcPayloadHeader(
+              call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id);
+          header.write(d);
+          call.rpcRequest.write(d);
           byte[] data = d.getData();
           int dataLength = d.getLength();
           out.writeInt(dataLength);      //first put the data length
@@ -826,7 +834,7 @@ public class Client {
         if (state == Status.SUCCESS.state) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
-          call.setValue(value);
+          call.setRpcResponse(value);
           calls.remove(id);
         } else if (state == Status.ERROR.state) {
           call.setException(new RemoteException(WritableUtils.readString(in),
@@ -910,7 +918,7 @@ public class Client {
     private int index;
     
     public ParallelCall(Writable param, ParallelResults results, int index) {
-      super(param);
+      super(RpcKind.RPC_WRITABLE, param);
       this.results = results;
       this.index = index;
     }
@@ -934,7 +942,7 @@ public class Client {
 
     /** Collect a result. */
     public synchronized void callComplete(ParallelCall call) {
-      values[call.index] = call.getValue();       // store the value
+      values[call.index] = call.getRpcResult();       // store the value
       count++;                                    // count it
       if (count == size)                          // if all values are in
         notify();                                 // then notify waiting caller
@@ -994,15 +1002,23 @@ public class Client {
     }
   }
 
+  /**
+   * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+   */
+  public Writable call(Writable param, InetSocketAddress address)
+  throws InterruptedException, IOException {
+    return call(RpcKind.RPC_WRITABLE, param, address);
+    
+  }
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(Writable param, InetSocketAddress address)
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
-      return call(param, address, null);
+      return call(rpcKind, param, address, null);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server running at
@@ -1010,15 +1026,15 @@ public class Client {
    * the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
-   * @deprecated Use {@link #call(Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
       UserGroupInformation ticket)  
       throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
         conf);
-    return call(param, remoteId);
+    return call(rpcKind, param, remoteId);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server running at
@@ -1027,18 +1043,33 @@ public class Client {
    * timeout, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
-   * @deprecated Use {@link #call(Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
    */
   @Deprecated
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
                        int rpcTimeout)  
                        throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-    return call(param, remoteId);
+    return call(rpcKind, param, remoteId);
   }
 
+  
+  /**
+   * Same as {@link #call(RpcKind, Writable, InetSocketAddress, 
+   * Class, UserGroupInformation, int, Configuration)}
+   * except that rpcKind is writable.
+   */
+  public Writable call(Writable param, InetSocketAddress addr, 
+      Class<?> protocol, UserGroupInformation ticket,
+      int rpcTimeout, Configuration conf)  
+      throws InterruptedException, IOException {
+        ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+        ticket, rpcTimeout, conf);
+        return call(RpcKind.RPC_WRITABLE, param, remoteId);
+  }
+  
   /**
    * Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> protocol,
@@ -1047,22 +1078,31 @@ public class Client {
    * value. Throws exceptions if there are network problems or if the remote
    * code threw an exception.
    */
-  public Writable call(Writable param, InetSocketAddress addr, 
+  public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
                        int rpcTimeout, Configuration conf)  
                        throws InterruptedException, IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-    return call(param, remoteId);
+    return call(rpcKind, param, remoteId);
+  }
+  
+  /**
+   * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
+   * except the rpcKind is RPC_WRITABLE
+   */
+  public Writable call(Writable param, ConnectionId remoteId)  
+      throws InterruptedException, IOException {
+     return call(RpcKind.RPC_WRITABLE, param, remoteId);
   }
   
   /** Make a call, passing <code>param</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. */
-  public Writable call(Writable param, ConnectionId remoteId)  
+  public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)  
       throws InterruptedException, IOException {
-    Call call = new Call(param);
+    Call call = new Call(rpcKind, param);
     Connection connection = getConnection(remoteId, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
@@ -1094,7 +1134,7 @@ public class Client {
                   call.error);
         }
       } else {
-        return call.value;
+        return call.rpcResponse;
       }
     }
   }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Wed Jan 11 07:04:06 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
@@ -139,7 +140,7 @@ public class ProtobufRpcEngine implement
       HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
       RpcResponseWritable val = null;
       try {
-        val = (RpcResponseWritable) client.call(
+        val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER,
             new RpcRequestWritable(rpcRequest), remoteId);
       } catch (Exception e) {
         RpcClientException ce = new RpcClientException("Client exception", e);

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Wed Jan 11 07:04:06 2012
@@ -62,11 +62,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RpcPayloadHeader.*;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.net.NetUtils;
@@ -108,7 +110,8 @@ public abstract class Server {
   // 4 : Introduced SASL security layer
   // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
   //     in ObjectWritable to efficiently transmit arrays of primitives
-  public static final byte CURRENT_VERSION = 5;
+  // 6 : Made RPC payload header explicit
+  public static final byte CURRENT_VERSION = 6;
 
   /**
    * Initial and max size of response buffer
@@ -261,28 +264,33 @@ public abstract class Server {
 
   /** A call queued for handling. */
   private static class Call {
-    private int id;                               // the client's call id
-    private Writable param;                       // the parameter passed
-    private Connection connection;                // connection to client
-    private long timestamp;     // the time received when response is null
-                                   // the time served when response is not null
-    private ByteBuffer response;                      // the response for this call
-
-    public Call(int id, Writable param, Connection connection) { 
-      this.id = id;
-      this.param = param;
+    private final int callId;             // the client's call id
+    private final Writable rpcRequest;    // Serialized Rpc request from client
+    private final Connection connection;  // connection to client
+    private long timestamp;               // time received when response is null
+                                          // time served when response is not null
+    private ByteBuffer rpcResponse;       // the response for this call
+    private final RpcKind rpcKind;
+
+    public Call(int id, Writable param, Connection connection) {
+      this( id,  param,  connection, RpcKind.RPC_BUILTIN );    
+    }
+    public Call(int id, Writable param, Connection connection, RpcKind kind) { 
+      this.callId = id;
+      this.rpcRequest = param;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
-      this.response = null;
+      this.rpcResponse = null;
+      this.rpcKind = kind;
     }
     
     @Override
     public String toString() {
-      return param.toString() + " from " + connection.toString();
+      return rpcRequest.toString() + " from " + connection.toString();
     }
 
     public void setResponse(ByteBuffer response) {
-      this.response = response;
+      this.rpcResponse = response;
     }
   }
 
@@ -781,17 +789,17 @@ public abstract class Server {
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+            LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                       call.connection);
           }
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channelWrite(channel, call.response);
+          int numBytes = channelWrite(channel, call.rpcResponse);
           if (numBytes < 0) {
             return true;
           }
-          if (!call.response.hasRemaining()) {
+          if (!call.rpcResponse.hasRemaining()) {
             call.connection.decRpcCount();
             if (numElements == 1) {    // last call fully processes.
               done = true;             // no more data for this channel.
@@ -799,7 +807,7 @@ public abstract class Server {
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote " + numBytes + " bytes.");
             }
           } else {
@@ -827,7 +835,7 @@ public abstract class Server {
               }
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+              LOG.debug(getName() + ": responding to #" + call.callId + " from " +
                         call.connection + " Wrote partial " + numBytes + 
                         " bytes.");
             }
@@ -1377,18 +1385,24 @@ public abstract class Server {
     private void processData(byte[] buf) throws  IOException, InterruptedException {
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
-      int id = dis.readInt();                    // try to read an id
+      RpcPayloadHeader header = new RpcPayloadHeader();
+      header.readFields(dis);           // Read the RpcPayload header
         
       if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + id);
-      Writable param;
-      try {
-        param = ReflectionUtils.newInstance(paramClass, conf);//read param
-        param.readFields(dis);
+        LOG.debug(" got #" + header.getCallId());
+      if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
+        throw new IOException("IPC Server does not implement operation" + 
+              header.getOperation());
+      }
+      Writable rpcRequest;
+      try { //Read the rpc request
+        rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+        rpcRequest.readFields(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
                  getHostAddress(), t);
-        final Call readParamsFailedCall = new Call(id, null, this);
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 
         setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@@ -1398,7 +1412,7 @@ public abstract class Server {
         return;
       }
         
-      Call call = new Call(id, param, this);
+      Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
       callQueue.put(call);              // queue the call; maybe blocked here
       incRpcCount();  // Increment the rpc count
     }
@@ -1462,8 +1476,8 @@ public abstract class Server {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": has #" + call.id + " from " +
-                      call.connection);
+            LOG.debug(getName() + ": has Call#" + call.callId + 
+                "for RpcKind " + call.rpcKind + " from " + call.connection);
           
           String errorClass = null;
           String error = null;
@@ -1474,7 +1488,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocolName, call.param, 
+              value = call(call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
             } else {
               value = 
@@ -1484,7 +1498,7 @@ public abstract class Server {
                      public Writable run() throws Exception {
                        // make the call
                        return call(call.connection.protocolName, 
-                                   call.param, call.timestamp);
+                                   call.rpcRequest, call.timestamp);
 
                      }
                    }
@@ -1634,7 +1648,7 @@ public abstract class Server {
   throws IOException {
     response.reset();
     DataOutputStream out = new DataOutputStream(response);
-    out.writeInt(call.id);                // write call id
+    out.writeInt(call.callId);                // write call id
     out.writeInt(status.state);           // write status
 
     if (status == Status.SUCCESS) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Wed Jan 11 07:04:06 2012
@@ -39,6 +39,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -242,7 +243,7 @@ public class WritableRpcEngine implement
       }
 
       ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), remoteId);
+        client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
       if (LOG.isDebugEnabled()) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);