You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/01/11 08:04:07 UTC
svn commit: r1229909 - in
/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common:
./ src/main/java/ src/main/java/org/apache/hadoop/ipc/
Author: szetszwo
Date: Wed Jan 11 07:04:06 2012
New Revision: 1229909
URL: http://svn.apache.org/viewvc?rev=1229909&view=rev
Log:
svn merge -c 1197885 from trunk for HADOOP-7776.
Added:
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
- copied unchanged from r1197885, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
Modified:
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/ (props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 11 07:04:06 2012
@@ -1 +1 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt Wed Jan 11 07:04:06 2012
@@ -21,6 +21,8 @@ Release 0.23-PB - Unreleased
HADOOP-7716 RPC protocol registration on SS does not log the protocol name
(only the class which may be different) (sanjay)
+ HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
+
Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 11 07:04:06 2012
@@ -1,5 +1,5 @@
/hadoop/common/branches/yahoo-merge/CHANGES.txt:1079157,1079163-1079164,1079167
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
/hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 11 07:04:06 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
/hadoop/core/branches/branch-0.19/core/src/java:713112
/hadoop/core/trunk/src/core:776175-785643,785929-786278
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Wed Jan 11 07:04:06 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RpcPayloadHeader.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -153,16 +154,20 @@ public class Client {
return refCount==0;
}
- /** A call waiting for a value. */
+ /**
+ * Class that represents an RPC call
+ */
private class Call {
- int id; // call id
- Writable param; // parameter
- Writable value; // value, null if error
- IOException error; // exception, null if value
- boolean done; // true when call is done
-
- protected Call(Writable param) {
- this.param = param;
+ final int id; // call id
+ final Writable rpcRequest; // the serialized rpc request - RpcPayload
+ Writable rpcResponse; // null if rpc has error
+ IOException error; // exception, null if success
+ final RpcKind rpcKind; // Rpc EngineKind
+ boolean done; // true when call is done
+
+ protected Call(RpcKind rpcKind, Writable param) {
+ this.rpcKind = rpcKind;
+ this.rpcRequest = param;
synchronized (Client.this) {
this.id = counter++;
}
@@ -188,15 +193,15 @@ public class Client {
/** Set the return value when there is no error.
* Notify the caller the call is done.
*
- * @param value return value of the call.
+ * @param rpcResponse return value of the rpc call.
*/
- public synchronized void setValue(Writable value) {
- this.value = value;
+ public synchronized void setRpcResponse(Writable rpcResponse) {
+ this.rpcResponse = rpcResponse;
callComplete();
}
- public synchronized Writable getValue() {
- return value;
+ public synchronized Writable getRpcResult() {
+ return rpcResponse;
}
}
@@ -728,6 +733,7 @@ public class Client {
}
}
+ @SuppressWarnings("unused")
public InetSocketAddress getRemoteAddress() {
return server;
}
@@ -788,8 +794,10 @@ public class Client {
//for serializing the
//data to be written
d = new DataOutputBuffer();
- d.writeInt(call.id);
- call.param.write(d);
+ RpcPayloadHeader header = new RpcPayloadHeader(
+ call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id);
+ header.write(d);
+ call.rpcRequest.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
@@ -826,7 +834,7 @@ public class Client {
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
- call.setValue(value);
+ call.setRpcResponse(value);
calls.remove(id);
} else if (state == Status.ERROR.state) {
call.setException(new RemoteException(WritableUtils.readString(in),
@@ -910,7 +918,7 @@ public class Client {
private int index;
public ParallelCall(Writable param, ParallelResults results, int index) {
- super(param);
+ super(RpcKind.RPC_WRITABLE, param);
this.results = results;
this.index = index;
}
@@ -934,7 +942,7 @@ public class Client {
/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
- values[call.index] = call.getValue(); // store the value
+ values[call.index] = call.getRpcResult(); // store the value
count++; // count it
if (count == size) // if all values are in
notify(); // then notify waiting caller
@@ -994,15 +1002,23 @@ public class Client {
}
}
+ /**
+ * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+ */
+ public Writable call(Writable param, InetSocketAddress address)
+ throws InterruptedException, IOException {
+ return call(RpcKind.RPC_WRITABLE, param, address);
+
+ }
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception.
- * @deprecated Use {@link #call(Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
*/
@Deprecated
- public Writable call(Writable param, InetSocketAddress address)
+ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
throws InterruptedException, IOException {
- return call(param, address, null);
+ return call(rpcKind, param, address, null);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@@ -1010,15 +1026,15 @@ public class Client {
* the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @deprecated Use {@link #call(Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
*/
@Deprecated
- public Writable call(Writable param, InetSocketAddress addr,
+ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
conf);
- return call(param, remoteId);
+ return call(rpcKind, param, remoteId);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
@@ -1027,18 +1043,33 @@ public class Client {
* timeout, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @deprecated Use {@link #call(Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
*/
@Deprecated
- public Writable call(Writable param, InetSocketAddress addr,
+ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
- return call(param, remoteId);
+ return call(rpcKind, param, remoteId);
}
+
+ /**
+ * Same as {@link #call(RpcKind, Writable, InetSocketAddress,
+ * Class, UserGroupInformation, int, Configuration)}
+ * except that rpcKind is writable.
+ */
+ public Writable call(Writable param, InetSocketAddress addr,
+ Class<?> protocol, UserGroupInformation ticket,
+ int rpcTimeout, Configuration conf)
+ throws InterruptedException, IOException {
+ ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+ ticket, rpcTimeout, conf);
+ return call(RpcKind.RPC_WRITABLE, param, remoteId);
+ }
+
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
@@ -1047,22 +1078,31 @@ public class Client {
* value. Throws exceptions if there are network problems or if the remote
* code threw an exception.
*/
- public Writable call(Writable param, InetSocketAddress addr,
+ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
- return call(param, remoteId);
+ return call(rpcKind, param, remoteId);
+ }
+
+ /**
+ * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
+ * except the rpcKind is RPC_WRITABLE
+ */
+ public Writable call(Writable param, ConnectionId remoteId)
+ throws InterruptedException, IOException {
+ return call(RpcKind.RPC_WRITABLE, param, remoteId);
}
/** Make a call, passing <code>param</code>, to the IPC server defined by
* <code>remoteId</code>, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
- public Writable call(Writable param, ConnectionId remoteId)
+ public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
- Call call = new Call(param);
+ Call call = new Call(rpcKind, param);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
@@ -1094,7 +1134,7 @@ public class Client {
call.error);
}
} else {
- return call.value;
+ return call.rpcResponse;
}
}
}
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Wed Jan 11 07:04:06 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
@@ -139,7 +140,7 @@ public class ProtobufRpcEngine implement
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
RpcResponseWritable val = null;
try {
- val = (RpcResponseWritable) client.call(
+ val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWritable(rpcRequest), remoteId);
} catch (Exception e) {
RpcClientException ce = new RpcClientException("Client exception", e);
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Wed Jan 11 07:04:06 2012
@@ -62,11 +62,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RpcPayloadHeader.*;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.net.NetUtils;
@@ -108,7 +110,8 @@ public abstract class Server {
// 4 : Introduced SASL security layer
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
- public static final byte CURRENT_VERSION = 5;
+ // 6 : Made RPC payload header explicit
+ public static final byte CURRENT_VERSION = 6;
/**
* Initial and max size of response buffer
@@ -261,28 +264,33 @@ public abstract class Server {
/** A call queued for handling. */
private static class Call {
- private int id; // the client's call id
- private Writable param; // the parameter passed
- private Connection connection; // connection to client
- private long timestamp; // the time received when response is null
- // the time served when response is not null
- private ByteBuffer response; // the response for this call
-
- public Call(int id, Writable param, Connection connection) {
- this.id = id;
- this.param = param;
+ private final int callId; // the client's call id
+ private final Writable rpcRequest; // Serialized Rpc request from client
+ private final Connection connection; // connection to client
+ private long timestamp; // time received when response is null
+ // time served when response is not null
+ private ByteBuffer rpcResponse; // the response for this call
+ private final RpcKind rpcKind;
+
+ public Call(int id, Writable param, Connection connection) {
+ this( id, param, connection, RpcKind.RPC_BUILTIN );
+ }
+ public Call(int id, Writable param, Connection connection, RpcKind kind) {
+ this.callId = id;
+ this.rpcRequest = param;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
- this.response = null;
+ this.rpcResponse = null;
+ this.rpcKind = kind;
}
@Override
public String toString() {
- return param.toString() + " from " + connection.toString();
+ return rpcRequest.toString() + " from " + connection.toString();
}
public void setResponse(ByteBuffer response) {
- this.response = response;
+ this.rpcResponse = response;
}
}
@@ -781,17 +789,17 @@ public abstract class Server {
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ LOG.debug(getName() + ": responding to #" + call.callId + " from " +
call.connection);
}
//
// Send as much data as we can in the non-blocking fashion
//
- int numBytes = channelWrite(channel, call.response);
+ int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
- if (!call.response.hasRemaining()) {
+ if (!call.rpcResponse.hasRemaining()) {
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
@@ -799,7 +807,7 @@ public abstract class Server {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ LOG.debug(getName() + ": responding to #" + call.callId + " from " +
call.connection + " Wrote " + numBytes + " bytes.");
}
} else {
@@ -827,7 +835,7 @@ public abstract class Server {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ LOG.debug(getName() + ": responding to #" + call.callId + " from " +
call.connection + " Wrote partial " + numBytes +
" bytes.");
}
@@ -1377,18 +1385,24 @@ public abstract class Server {
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
- int id = dis.readInt(); // try to read an id
+ RpcPayloadHeader header = new RpcPayloadHeader();
+ header.readFields(dis); // Read the RpcPayload header
if (LOG.isDebugEnabled())
- LOG.debug(" got #" + id);
- Writable param;
- try {
- param = ReflectionUtils.newInstance(paramClass, conf);//read param
- param.readFields(dis);
+ LOG.debug(" got #" + header.getCallId());
+ if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
+ throw new IOException("IPC Server does not implement operation" +
+ header.getOperation());
+ }
+ Writable rpcRequest;
+ try { //Read the rpc request
+ rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+ rpcRequest.readFields(dis);
} catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " +
getHostAddress(), t);
- final Call readParamsFailedCall = new Call(id, null, this);
+ final Call readParamsFailedCall =
+ new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
@@ -1398,7 +1412,7 @@ public abstract class Server {
return;
}
- Call call = new Call(id, param, this);
+ Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
@@ -1462,8 +1476,8 @@ public abstract class Server {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": has #" + call.id + " from " +
- call.connection);
+ LOG.debug(getName() + ": has Call#" + call.callId +
+ "for RpcKind " + call.rpcKind + " from " + call.connection);
String errorClass = null;
String error = null;
@@ -1474,7 +1488,7 @@ public abstract class Server {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
- value = call(call.connection.protocolName, call.param,
+ value = call(call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
@@ -1484,7 +1498,7 @@ public abstract class Server {
public Writable run() throws Exception {
// make the call
return call(call.connection.protocolName,
- call.param, call.timestamp);
+ call.rpcRequest, call.timestamp);
}
}
@@ -1634,7 +1648,7 @@ public abstract class Server {
throws IOException {
response.reset();
DataOutputStream out = new DataOutputStream(response);
- out.writeInt(call.id); // write call id
+ out.writeInt(call.callId); // write call id
out.writeInt(status.state); // write status
if (status == Status.SUCCESS) {
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1229909&r1=1229908&r2=1229909&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Wed Jan 11 07:04:06 2012
@@ -39,6 +39,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@@ -242,7 +243,7 @@ public class WritableRpcEngine implement
}
ObjectWritable value = (ObjectWritable)
- client.call(new Invocation(method, args), remoteId);
+ client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);