You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/01/12 03:54:02 UTC
svn commit: r1230378 [1/2] - in
/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common:
./ src/main/java/ src/main/java/org/apache/hadoop/ipc/
src/main/java/org/apache/hadoop/ipc/protobuf/ src/proto/
src/test/java/org/apache/hadoop/...
Author: szetszwo
Date: Thu Jan 12 02:54:01 2012
New Revision: 1230378
URL: http://svn.apache.org/viewvc?rev=1230378&view=rev
Log:
svn merge -c 1210208 from trunk for HADOOP-7862.
Modified:
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/ (props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 12 02:54:01 2012
@@ -1 +1 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1210208,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt Thu Jan 12 02:54:01 2012
@@ -23,6 +23,9 @@ Release 0.23-PB - Unreleased
HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
+ HADOOP-7862 Move the support for multiple protocols to lower layer so
+ that Writable, PB and Avro can all use it (Sanjay)
+
BUG FIXES
HADOOP-7833. Fix findbugs warnings in protobuf generated code.
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 12 02:54:01 2012
@@ -1,5 +1,5 @@
/hadoop/common/branches/yahoo-merge/CHANGES.txt:1079157,1079163-1079164,1079167
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1210208,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
/hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 12 02:54:01 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1210208,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
/hadoop/core/branches/branch-0.19/core/src/java:713112
/hadoop/core/trunk/src/core:776175-785643,785929-786278
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu Jan 12 02:54:01 2012
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -237,14 +238,15 @@ public class AvroRpcEngine implements Rp
super((Class)null, new Object(), conf,
bindAddress, port, numHandlers, numReaders,
queueSizePerHandler, verbose, secretManager);
- super.addProtocol(TunnelProtocol.class, responder);
+ // RpcKind is WRITABLE since Avro is tunneled through WRITABLE
+ super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder);
responder.addProtocol(iface, impl);
}
@Override
- public <PROTO, IMPL extends PROTO> Server
- addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+ public Server
+ addProtocol(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl)
throws IOException {
responder.addProtocol(protocolClass, protocolImpl);
return this;
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Thu Jan 12 02:54:01 2012
@@ -1003,17 +1003,19 @@ public class Client {
}
/**
- * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+ * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+ * for RPC_BUILTIN
*/
public Writable call(Writable param, InetSocketAddress address)
throws InterruptedException, IOException {
- return call(RpcKind.RPC_WRITABLE, param, address);
+ return call(RpcKind.RPC_BUILTIN, param, address);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception.
- * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+ * ConnectionId)} instead
*/
@Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
@@ -1026,7 +1028,8 @@ public class Client {
* the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+ * ConnectionId)} instead
*/
@Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
@@ -1043,7 +1046,8 @@ public class Client {
* timeout, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+ * ConnectionId)} instead
*/
@Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
@@ -1057,7 +1061,7 @@ public class Client {
/**
- * Same as {@link #call(RpcKind, Writable, InetSocketAddress,
+ * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that rpcKind is writable.
*/
@@ -1067,7 +1071,7 @@ public class Client {
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
- return call(RpcKind.RPC_WRITABLE, param, remoteId);
+ return call(RpcKind.RPC_BUILTIN, param, remoteId);
}
/**
@@ -1088,21 +1092,28 @@ public class Client {
}
/**
- * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
- * except the rpcKind is RPC_WRITABLE
+ * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+ * except the rpcKind is RPC_BUILTIN
*/
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
- return call(RpcKind.RPC_WRITABLE, param, remoteId);
+ return call(RpcKind.RPC_BUILTIN, param, remoteId);
}
- /** Make a call, passing <code>param</code>, to the IPC server defined by
- * <code>remoteId</code>, returning the value.
+ /**
+ * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+ * <code>remoteId</code>, returning the rpc respond.
+ *
+ * @param rpcKind
+ * @param rpcRequest - contains serialized method and method parameters
+ * @param remoteId - the target rpc server
+ * @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
- * threw an exception. */
- public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)
- throws InterruptedException, IOException {
- Call call = new Call(rpcKind, param);
+ * threw an exception.
+ */
+ public Writable call(RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId) throws InterruptedException, IOException {
+ Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Thu Jan 12 02:54:01 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
@@ -60,6 +61,12 @@ import com.google.protobuf.ServiceExcept
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+
+ static { // Register the rpcRequest deserializer for WritableRpcEngine
+ org.apache.hadoop.ipc.Server.registerProtocolEngine(
+ RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
+ new Server.ProtoBufRpcInvoker());
+ }
private static final ClientCache CLIENTS = new ClientCache();
@@ -75,10 +82,13 @@ public class ProtobufRpcEngine implement
}
private static class Invoker implements InvocationHandler, Closeable {
- private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+ private final Map<String, Message> returnTypes =
+ new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
- private Client.ConnectionId remoteId;
- private Client client;
+ private final Client.ConnectionId remoteId;
+ private final Client client;
+ private final long clientProtocolVersion;
+ private final String protocolName;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
@@ -87,6 +97,8 @@ public class ProtobufRpcEngine implement
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory,
RpcResponseWritable.class);
+ this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
+ this.protocolName = RPC.getProtocolName(protocol);
}
private HadoopRpcRequestProto constructRpcRequest(Method method,
@@ -108,6 +120,19 @@ public class ProtobufRpcEngine implement
Message param = (Message) params[1];
builder.setRequest(param.toByteString());
+ // For protobuf, {@code protocol} used when creating client side proxy is
+ // the interface extending BlockingInterface, which has the annotations
+ // such as ProtocolName etc.
+ //
+ // Using Method.getDeclaringClass(), as in WritableEngine to get at
+ // the protocol interface will return BlockingInterface, from where
+ // the annotation ProtocolName and Version cannot be
+ // obtained.
+ //
+ // Hence we simply use the protocol class used to create the proxy.
+ // For PB this may limit the use of mixins on client side.
+ builder.setDeclaringClassProtocolName(protocolName);
+ builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
}
@@ -272,15 +297,16 @@ public class ProtobufRpcEngine implement
RpcResponseWritable.class);
}
+
@Override
- public RPC.Server getServer(Class<?> protocol, Object instance,
+ public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- return new Server(instance, conf, bindAddress, port, numHandlers,
- numReaders, queueSizePerHandler, verbose, secretManager);
+ return new Server(protocol, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
}
private static RemoteException getRemoteException(Exception e) {
@@ -289,87 +315,31 @@ public class ProtobufRpcEngine implement
}
public static class Server extends RPC.Server {
- private BlockingService service;
- private boolean verbose;
-
- private static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
- }
- return names[names.length - 1];
- }
-
/**
* Construct an RPC server.
*
- * @param instance the instance whose methods will be called
+ * @param protocolClass the class of protocol
+ * @param protocolImpl the protocolImpl whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
- public Server(Object instance, Configuration conf, String bindAddress,
- int port, int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port, int numHandlers,
+ int numReaders, int queueSizePerHandler, boolean verbose,
+ SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, RpcRequestWritable.class, numHandlers,
- numReaders, queueSizePerHandler, conf, classNameBase(instance
+ numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
.getClass().getName()), secretManager);
- this.service = (BlockingService) instance;
- this.verbose = verbose;
+ this.verbose = verbose;
+ registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER,
+ protocolClass, protocolImpl);
}
- /**
- * This is a server side method, which is invoked over RPC. On success
- * the return response has protobuf response payload. On failure, the
- * exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
- *
- * In this method there three types of exceptions possible and they are
- * returned in response as follows.
- * <ol>
- * <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
- * <li> Exceptions thrown by the service is wrapped in ServiceException. In that
- * this method returns in response the exception thrown by the service.</li>
- * <li> Other exceptions thrown by the service. They are returned as
- * it is.</li>
- * </ol>
- */
- @Override
- public Writable call(String protocol, Writable writableRequest,
- long receiveTime) throws IOException {
- RpcRequestWritable request = (RpcRequestWritable) writableRequest;
- HadoopRpcRequestProto rpcRequest = request.message;
- String methodName = rpcRequest.getMethodName();
- if (verbose)
- LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
- MethodDescriptor methodDescriptor = service.getDescriptorForType()
- .findMethodByName(methodName);
- if (methodDescriptor == null) {
- String msg = "Unknown method " + methodName + " called on " + protocol
- + " protocol.";
- LOG.warn(msg);
- return handleException(new RpcServerException(msg));
- }
- Message prototype = service.getRequestPrototype(methodDescriptor);
- Message param = prototype.newBuilderForType()
- .mergeFrom(rpcRequest.getRequest()).build();
- Message result;
- try {
- result = service.callBlockingMethod(methodDescriptor, null, param);
- } catch (ServiceException e) {
- Throwable cause = e.getCause();
- return handleException(cause != null ? cause : e);
- } catch (Exception e) {
- return handleException(e);
- }
-
- HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
- return new RpcResponseWritable(response);
- }
-
- private RpcResponseWritable handleException(Throwable e) {
+ private static RpcResponseWritable handleException(Throwable e) {
HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
.setExceptionName(e.getClass().getName())
.setStackTrace(StringUtils.stringifyException(e)).build();
@@ -378,7 +348,7 @@ public class ProtobufRpcEngine implement
return new RpcResponseWritable(response);
}
- private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
+ private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
Message message) {
HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
.setResponse(message.toByteString())
@@ -386,5 +356,81 @@ public class ProtobufRpcEngine implement
.build();
return res;
}
+
+ /**
+ * Protobuf invoker for {@link RpcInvoker}
+ */
+ static class ProtoBufRpcInvoker implements RpcInvoker {
+
+ @Override
+ /**
+ * This is a server side method, which is invoked over RPC. On success
+ * the return response has protobuf response payload. On failure, the
+ * exception name and the stack trace are return in the resposne.
+ * See {@link HadoopRpcResponseProto}
+ *
+ * In this method there three types of exceptions possible and they are
+ * returned in response as follows.
+ * <ol>
+ * <li> Exceptions encountered in this method that are returned
+ * as {@link RpcServerException} </li>
+ * <li> Exceptions thrown by the service is wrapped in ServiceException.
+ * In that this method returns in response the exception thrown by the
+ * service.</li>
+ * <li> Other exceptions thrown by the service. They are returned as
+ * it is.</li>
+ * </ol>
+ */
+ public Writable call(RPC.Server server, String protocol,
+ Writable writableRequest, long receiveTime) throws IOException {
+ RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+ HadoopRpcRequestProto rpcRequest = request.message;
+ String methodName = rpcRequest.getMethodName();
+ String protoName = rpcRequest.getDeclaringClassProtocolName();
+ long clientVersion = rpcRequest.getClientProtocolVersion();
+ if (server.verbose)
+ LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+
+ ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
+ ProtoClassProtoImpl protocolImpl =
+ server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+ if (protocolImpl == null) { // no match for Protocol AND Version
+ VerProtocolImpl highest =
+ server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER,
+ protoName);
+ if (highest == null) {
+ throw new IOException("Unknown protocol: " + protoName);
+ }
+ // protocol supported but not the version that client wants
+ throw new RPC.VersionMismatch(protoName, clientVersion,
+ highest.version);
+ }
+
+ BlockingService service = (BlockingService) protocolImpl.protocolImpl;
+ MethodDescriptor methodDescriptor = service.getDescriptorForType()
+ .findMethodByName(methodName);
+ if (methodDescriptor == null) {
+ String msg = "Unknown method " + methodName + " called on " + protocol
+ + " protocol.";
+ LOG.warn(msg);
+ return handleException(new RpcServerException(msg));
+ }
+ Message prototype = service.getRequestPrototype(methodDescriptor);
+ Message param = prototype.newBuilderForType()
+ .mergeFrom(rpcRequest.getRequest()).build();
+ Message result;
+ try {
+ result = service.callBlockingMethod(methodDescriptor, null, param);
+ } catch (ServiceException e) {
+ Throwable cause = e.getCause();
+ return handleException(cause != null ? cause : e);
+ } catch (Exception e) {
+ return handleException(e);
+ }
+
+ HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
+ return new RpcResponseWritable(response);
+ }
+ }
}
}
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java Thu Jan 12 02:54:01 2012
@@ -35,4 +35,5 @@ import java.lang.annotation.RetentionPol
@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {
String protocolName(); // the name of the protocol (i.e. rpc service)
+ long protocolVersion() default -1; // default means not defined use old way
}
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java Thu Jan 12 02:54:01 2012
@@ -57,19 +57,11 @@ public class ProtocolProxy<T> {
private void fetchServerMethods(Method method) throws IOException {
long clientVersion;
- try {
- Field versionField = method.getDeclaringClass().getField("versionID");
- versionField.setAccessible(true);
- clientVersion = versionField.getLong(method.getDeclaringClass());
- } catch (NoSuchFieldException ex) {
- throw new RuntimeException(ex);
- } catch (IllegalAccessException ex) {
- throw new RuntimeException(ex);
- }
+ clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
int clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
- .getProtocolSignature(protocol.getName(), clientVersion,
+ .getProtocolSignature(RPC.getProtocolName(protocol), clientVersion,
clientMethodsHash);
long serverVersion = serverInfo.getVersion();
if (serverVersion != clientVersion) {
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java Thu Jan 12 02:54:01 2012
@@ -29,6 +29,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public class ProtocolSignature implements Writable {
static { // register a ctor
WritableFactories.setFactory
@@ -164,10 +166,15 @@ public class ProtocolSignature implement
/**
* A cache that maps a protocol's name to its signature & finger print
*/
- final private static HashMap<String, ProtocolSigFingerprint>
+ private final static HashMap<String, ProtocolSigFingerprint>
PROTOCOL_FINGERPRINT_CACHE =
new HashMap<String, ProtocolSigFingerprint>();
+ @VisibleForTesting
+ public static void resetCache() {
+ PROTOCOL_FINGERPRINT_CACHE.clear();
+ }
+
/**
* Return a protocol's signature and finger print from cache
*
@@ -177,7 +184,7 @@ public class ProtocolSignature implement
*/
private static ProtocolSigFingerprint getSigFingerprint(
Class <? extends VersionedProtocol> protocol, long serverVersion) {
- String protocolName = protocol.getName();
+ String protocolName = RPC.getProtocolName(protocol);
synchronized (PROTOCOL_FINGERPRINT_CACHE) {
ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
if (sig == null) {
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Thu Jan 12 02:54:01 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.ipc;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
@@ -28,6 +29,9 @@ import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.io.*;
import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -36,6 +40,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -63,8 +68,54 @@ import org.apache.hadoop.util.Reflection
* the protocol instance is transmitted.
*/
public class RPC {
+
+ interface RpcInvoker {
+ /**
+ * Process a client call on the server side
+ * @param server the server within whose context this rpc call is made
+ * @param protocol - the protocol name (the class of the client proxy
+ * used to make calls to the rpc server.
+ * @param rpcRequest - deserialized
+ * @param receiveTime time at which the call received (for metrics)
+ * @return the call's return
+ * @throws IOException
+ **/
+ public Writable call(Server server, String protocol,
+ Writable rpcRequest, long receiveTime) throws IOException ;
+ }
+
static final Log LOG = LogFactory.getLog(RPC.class);
+ /**
+ * Get all superInterfaces that extend VersionedProtocol
+ * @param childInterfaces
+ * @return the super interfaces that extend VersionedProtocol
+ */
+ static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+ List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+ for (Class<?> childInterface : childInterfaces) {
+ if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+ allInterfaces.add(childInterface);
+ allInterfaces.addAll(
+ Arrays.asList(
+ getSuperInterfaces(childInterface.getInterfaces())));
+ } else {
+ LOG.warn("Interface " + childInterface +
+ " ignored because it does not extend VersionedProtocol");
+ }
+ }
+ return allInterfaces.toArray(new Class[allInterfaces.size()]);
+ }
+
+ /**
+ * Get all interfaces that the given protocol implements or extends
+ * which are assignable from VersionedProtocol.
+ */
+ static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+ Class<?>[] interfaces = protocol.getInterfaces();
+ return getSuperInterfaces(interfaces);
+ }
/**
* Get the protocol name.
@@ -75,9 +126,36 @@ public class RPC {
if (protocol == null) {
return null;
}
- ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+ ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
return (anno == null) ? protocol.getName() : anno.protocolName();
}
+
+ /**
+ * Get the protocol version from protocol class.
+ * If the protocol class has a ProtocolAnnotation, then get the protocol
+ * name from the annotation; otherwise the class name is the protocol name.
+ */
+ static public long getProtocolVersion(Class<?> protocol) {
+ if (protocol == null) {
+ throw new IllegalArgumentException("Null protocol");
+ }
+ long version;
+ ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+ if (anno != null) {
+ version = anno.protocolVersion();
+ if (version != -1)
+ return version;
+ }
+ try {
+ Field versionField = protocol.getField("versionID");
+ versionField.setAccessible(true);
+ return versionField.getLong(protocol);
+ } catch (NoSuchFieldException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
private RPC() {} // no public ctor
@@ -589,6 +667,144 @@ public class RPC {
/** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server {
+ boolean verbose;
+ static String classNameBase(String className) {
+ String[] names = className.split("\\.", -1);
+ if (names == null || names.length == 0) {
+ return className;
+ }
+ return names[names.length-1];
+ }
+
+ /**
+ * Store a map of protocol and version to its implementation
+ */
+ /**
+ * The key in Map
+ */
+ static class ProtoNameVer {
+ final String protocol;
+ final long version;
+ ProtoNameVer(String protocol, long ver) {
+ this.protocol = protocol;
+ this.version = ver;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+ if (this == o)
+ return true;
+ if (! (o instanceof ProtoNameVer))
+ return false;
+ ProtoNameVer pv = (ProtoNameVer) o;
+ return ((pv.protocol.equals(this.protocol)) &&
+ (pv.version == this.version));
+ }
+ @Override
+ public int hashCode() {
+ return protocol.hashCode() * 37 + (int) version;
+ }
+ }
+
+ /**
+ * The value in map
+ */
+ static class ProtoClassProtoImpl {
+ final Class<?> protocolClass;
+ final Object protocolImpl;
+ ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+ this.protocolClass = protocolClass;
+ this.protocolImpl = protocolImpl;
+ }
+ }
+
+ ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray =
+ new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
+
+ Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+ if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
+ for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
+ protocolImplMapArray.add(
+ new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
+ }
+ }
+ return protocolImplMapArray.get(rpcKind.ordinal());
+ }
+
+ // Register protocol and its impl for rpc calls
+ void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
+ Object protocolImpl) throws IOException {
+ String protocolName = RPC.getProtocolName(protocolClass);
+ long version;
+
+
+ try {
+ version = RPC.getProtocolVersion(protocolClass);
+ } catch (Exception ex) {
+ LOG.warn("Protocol " + protocolClass +
+ " NOT registered as cannot get protocol version ");
+ return;
+ }
+
+
+ getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
+ new ProtoClassProtoImpl(protocolClass, protocolImpl));
+ LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName + " version=" + version +
+ " ProtocolImpl=" + protocolImpl.getClass().getName() +
+ " protocolClass=" + protocolClass.getName());
+ }
+
+ static class VerProtocolImpl {
+ final long version;
+ final ProtoClassProtoImpl protocolTarget;
+ VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+ this.version = ver;
+ this.protocolTarget = protocolTarget;
+ }
+ }
+
+
+ @SuppressWarnings("unused") // will be useful later.
+ VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+ String protocolName) {
+ VerProtocolImpl[] resultk =
+ new VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
+ int i = 0;
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+ getProtocolImplMap(rpcKind).entrySet()) {
+ if (pv.getKey().protocol.equals(protocolName)) {
+ resultk[i++] =
+ new VerProtocolImpl(pv.getKey().version, pv.getValue());
+ }
+ }
+ if (i == 0) {
+ return null;
+ }
+ VerProtocolImpl[] result = new VerProtocolImpl[i];
+ System.arraycopy(resultk, 0, result, 0, i);
+ return result;
+ }
+
+ VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind,
+ String protocolName) {
+ Long highestVersion = 0L;
+ ProtoClassProtoImpl highest = null;
+ System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+ getProtocolImplMap(rpcKind).entrySet()) {
+ if (pv.getKey().protocol.equals(protocolName)) {
+ if ((highest == null) || (pv.getKey().version > highestVersion)) {
+ highest = pv.getValue();
+ highestVersion = pv.getKey().version;
+ }
+ }
+ }
+ if (highest == null) {
+ return null;
+ }
+ return new VerProtocolImpl(highestVersion, highest);
+ }
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
@@ -605,11 +821,17 @@ public class RPC {
* @param protocolImpl - the impl of the protocol that will be called
* @return the server (for convenience)
*/
- public <PROTO, IMPL extends PROTO>
- Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
- ) throws IOException {
- throw new IOException("addProtocol Not Implemented");
+ public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
+ Object protocolImpl) throws IOException {
+ registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
+ return this;
+ }
+
+ @Override
+ public Writable call(RpcKind rpcKind, String protocol,
+ Writable rpcRequest, long receiveTime) throws IOException {
+ return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
+ receiveTime);
}
}
-
}
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java Thu Jan 12 02:54:01 2012
@@ -54,13 +54,14 @@ public class RpcPayloadHeader implements
}
public enum RpcKind {
- RPC_BUILTIN ((short ) 1), // Used for built in calls
- RPC_WRITABLE ((short ) 2),
- RPC_PROTOCOL_BUFFER ((short)3),
- RPC_AVRO ((short)4);
-
+ RPC_BUILTIN ((short) 1), // Used for built in calls by tests
+ RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
+ RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine
+ RPC_AVRO ((short) 4); // Use AvroRpcEngine
+ static final short MAX_INDEX = RPC_AVRO.value; // used for array size
+ private static final short FIRST_INDEX = RPC_BUILTIN.value;
private final short value;
- private static final short FIRST_INDEX = RPC_BUILTIN.value;
+
RpcKind(short val) {
this.value = val;
}
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Jan 12 02:54:01 2012
@@ -43,6 +43,7 @@ import java.nio.channels.WritableByteCha
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -67,6 +68,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -117,6 +119,59 @@ public abstract class Server {
* Initial and max size of response buffer
*/
static int INITIAL_RESP_BUF_SIZE = 10240;
+
+ static class RpcKindMapValue {
+ final Class<? extends Writable> rpcRequestWrapperClass;
+ final RpcInvoker rpcInvoker;
+ RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+ RpcInvoker rpcInvoker) {
+ this.rpcInvoker = rpcInvoker;
+ this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+ }
+ }
+ static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+ HashMap<RpcKind, RpcKindMapValue>(4);
+
+
+
+ /**
+ * Register a RPC kind and the class to deserialize the rpc request.
+ *
+ * Called by static initializers of rpcKind Engines
+ * @param rpcKind
+ * @param rpcRequestWrapperClass - this class is used to deserialze the
+ * the rpc request.
+ * @param rpcInvoker - use to process the calls on SS.
+ */
+
+ public static void registerProtocolEngine(RpcKind rpcKind,
+ Class<? extends Writable> rpcRequestWrapperClass,
+ RpcInvoker rpcInvoker) {
+ RpcKindMapValue old =
+ rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+ if (old != null) {
+ rpcKindMap.put(rpcKind, old);
+ throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+ rpcKind);
+ }
+ LOG.info("rpcKind=" + rpcKind +
+ ", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
+ ", rpcInvoker=" + rpcInvoker);
+ }
+
+ public Class<? extends Writable> getRpcRequestWrapper(
+ RpcKind rpcKind) {
+ if (rpcRequestClass != null)
+ return rpcRequestClass;
+ RpcKindMapValue val = rpcKindMap.get(rpcKind);
+ return (val == null) ? null : val.rpcRequestWrapperClass;
+ }
+
+ public static RpcInvoker getRpcInvoker(RpcKind rpcKind) {
+ RpcKindMapValue val = rpcKindMap.get(rpcKind);
+ return (val == null) ? null : val.rpcInvoker;
+ }
+
public static final Log LOG = LogFactory.getLog(Server.class);
public static final Log AUDITLOG =
@@ -181,7 +236,7 @@ public abstract class Server {
private int port; // port we listen on
private int handlerCount; // number of handler threads
private int readThreads; // number of read threads
- private Class<? extends Writable> paramClass; // class of call parameters
+ private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
private int maxIdleTime; // the maximum idle time after
// which a client may be disconnected
private int thresholdIdleConnections; // the number of idle connections
@@ -1394,9 +1449,27 @@ public abstract class Server {
throw new IOException("IPC Server does not implement operation" +
header.getOperation());
}
+ // If we know the rpc kind, get its class so that we can deserialize
+ // (Note it would make more sense to have the handler deserialize but
+ // we continue with this original design.
+ Class<? extends Writable> rpcRequestClass =
+ getRpcRequestWrapper(header.getkind());
+ if (rpcRequestClass == null) {
+ LOG.warn("Unknown rpc kind " + header.getkind() +
+ " from client " + getHostAddress());
+ final Call readParamsFailedCall =
+ new Call(header.getCallId(), null, this);
+ ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+ setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+ IOException.class.getName(),
+ "Unknown rpc kind " + header.getkind());
+ responder.doRespond(readParamsFailedCall);
+ return;
+ }
Writable rpcRequest;
try { //Read the rpc request
- rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+ rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " +
@@ -1488,7 +1561,7 @@ public abstract class Server {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
- value = call(call.connection.protocolName, call.rpcRequest,
+ value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
@@ -1497,7 +1570,7 @@ public abstract class Server {
@Override
public Writable run() throws Exception {
// make the call
- return call(call.connection.protocolName,
+ return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
@@ -1550,24 +1623,33 @@ public abstract class Server {
Configuration conf)
throws IOException
{
- this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+ this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+ .toString(port), null);
}
- /** Constructs a server listening on the named port and address. Parameters passed must
+ /**
+ * Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
* If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
* from configuration. Otherwise the configuration will be picked up.
+ *
+ * If rpcRequestClass is null then the rpcRequestClass must have been
+ * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+ * Class, RPC.RpcInvoker)}
+ * This parameter has been retained for compatibility with existing tests
+ * and usage.
*/
@SuppressWarnings("unchecked")
- protected Server(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
- Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
+ protected Server(String bindAddress, int port,
+ Class<? extends Writable> rpcRequestClass, int handlerCount,
+ int numReaders, int queueSizePerHandler, Configuration conf,
+ String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
- this.paramClass = paramClass;
+ this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
if (queueSizePerHandler != -1) {
@@ -1765,17 +1847,17 @@ public abstract class Server {
/**
* Called for each call.
- * @deprecated Use {@link #call(String, Writable, long)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, String,
+ * Writable, long)} instead
*/
@Deprecated
public Writable call(Writable param, long receiveTime) throws IOException {
- return call(null, param, receiveTime);
+ return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
}
/** Called for each call. */
- public abstract Writable call(String protocol,
- Writable param, long receiveTime)
- throws IOException;
+ public abstract Writable call(RpcKind rpcKind, String protocol,
+ Writable param, long receiveTime) throws IOException;
/**
* Authorize the incoming client connection.
@@ -1925,5 +2007,5 @@ public abstract class Server {
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
- }
+ }
}
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Thu Jan 12 02:54:01 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.ipc;
-import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
@@ -27,18 +26,14 @@ import java.lang.reflect.InvocationTarge
import java.net.InetSocketAddress;
import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.io.Closeable;
-import java.util.Map;
-import java.util.HashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
@@ -53,36 +48,9 @@ import org.apache.hadoop.conf.*;
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
-
- /**
- * Get all superInterfaces that extend VersionedProtocol
- * @param childInterfaces
- * @return the super interfaces that extend VersionedProtocol
- */
- private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
- List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
-
- for (Class<?> childInterface : childInterfaces) {
- if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
- allInterfaces.add(childInterface);
- allInterfaces.addAll(
- Arrays.asList(
- getSuperInterfaces(childInterface.getInterfaces())));
- } else {
- LOG.warn("Interface " + childInterface +
- " ignored because it does not extend VersionedProtocol");
- }
- }
- return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
- }
-
- /**
- * Get all interfaces that the given protocol implements or extends
- * which are assignable from VersionedProtocol.
- */
- private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
- Class<?>[] interfaces = protocol.getInterfaces();
- return getSuperInterfaces(interfaces);
+ static { // Register the rpcRequest deserializer for WritableRpcEngine
+ org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+ Invocation.class, new Server.WritableRpcInvoker());
}
@@ -120,15 +88,7 @@ public class WritableRpcEngine implement
clientVersion = 0;
clientMethodsHash = 0;
} else {
- try {
- Field versionField = method.getDeclaringClass().getField("versionID");
- versionField.setAccessible(true);
- this.clientVersion = versionField.getLong(method.getDeclaringClass());
- } catch (NoSuchFieldException ex) {
- throw new RuntimeException(ex);
- } catch (IllegalAccessException ex) {
- throw new RuntimeException(ex);
- }
+ this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
this.clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
}
@@ -329,140 +289,25 @@ public class WritableRpcEngine implement
/** An RPC Server. */
public static class Server extends RPC.Server {
- private boolean verbose;
-
/**
- * The key in Map
- */
- static class ProtoNameVer {
- final String protocol;
- final long version;
- ProtoNameVer(String protocol, long ver) {
- this.protocol = protocol;
- this.version = ver;
- }
- @Override
- public boolean equals(Object o) {
- if (o == null)
- return false;
- if (this == o)
- return true;
- if (! (o instanceof ProtoNameVer))
- return false;
- ProtoNameVer pv = (ProtoNameVer) o;
- return ((pv.protocol.equals(this.protocol)) &&
- (pv.version == this.version));
- }
- @Override
- public int hashCode() {
- return protocol.hashCode() * 37 + (int) version;
- }
- }
-
- /**
- * The value in map
- */
- static class ProtoClassProtoImpl {
- final Class<?> protocolClass;
- final Object protocolImpl;
- ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
- this.protocolClass = protocolClass;
- this.protocolImpl = protocolImpl;
- }
- }
-
- private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap =
- new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
-
- // Register protocol and its impl for rpc calls
- private void registerProtocolAndImpl(Class<?> protocolClass,
- Object protocolImpl) throws IOException {
- String protocolName = RPC.getProtocolName(protocolClass);
- VersionedProtocol vp = (VersionedProtocol) protocolImpl;
- long version;
- try {
- version = vp.getProtocolVersion(protocolName, 0);
- } catch (Exception ex) {
- LOG.warn("Protocol " + protocolClass +
- " NOT registered as getProtocolVersion throws exception ");
- return;
- }
- protocolImplMap.put(new ProtoNameVer(protocolName, version),
- new ProtoClassProtoImpl(protocolClass, protocolImpl));
- LOG.info("Protocol Name = " + protocolName + " version=" + version +
- " ProtocolImpl=" + protocolImpl.getClass().getName() +
- " protocolClass=" + protocolClass.getName());
- }
-
- private static class VerProtocolImpl {
- final long version;
- final ProtoClassProtoImpl protocolTarget;
- VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
- this.version = ver;
- this.protocolTarget = protocolTarget;
- }
- }
-
-
- @SuppressWarnings("unused") // will be useful later.
- private VerProtocolImpl[] getSupportedProtocolVersions(
- String protocolName) {
- VerProtocolImpl[] resultk = new VerProtocolImpl[protocolImplMap.size()];
- int i = 0;
- for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
- protocolImplMap.entrySet()) {
- if (pv.getKey().protocol.equals(protocolName)) {
- resultk[i++] =
- new VerProtocolImpl(pv.getKey().version, pv.getValue());
- }
- }
- if (i == 0) {
- return null;
- }
- VerProtocolImpl[] result = new VerProtocolImpl[i];
- System.arraycopy(resultk, 0, result, 0, i);
- return result;
- }
-
- private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {
- Long highestVersion = 0L;
- ProtoClassProtoImpl highest = null;
- for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
- .entrySet()) {
- if (pv.getKey().protocol.equals(protocolName)) {
- if ((highest == null) || (pv.getKey().version > highestVersion)) {
- highest = pv.getValue();
- highestVersion = pv.getKey().version;
- }
- }
- }
- if (highest == null) {
- return null;
- }
- return new VerProtocolImpl(highestVersion, highest);
- }
-
-
- /** Construct an RPC server.
+ * Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*
- * @deprecated Use #Server(Class, Object, Configuration, String, int)
- *
+ * @deprecated Use #Server(Class, Object, Configuration, String, int)
*/
@Deprecated
public Server(Object instance, Configuration conf, String bindAddress,
- int port)
- throws IOException {
+ int port) throws IOException {
this(null, instance, conf, bindAddress, port);
}
/** Construct an RPC server.
- * @param protocol class
- * @param instance the instance whose methods will be called
+ * @param protocolClass class
+ * @param protocolImpl the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
@@ -474,16 +319,8 @@ public class WritableRpcEngine implement
false, null);
}
- private static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
- }
- return names[names.length-1];
- }
-
-
- /** Construct an RPC server.
+ /**
+ * Construct an RPC server.
* @param protocolImpl the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
@@ -505,7 +342,8 @@ public class WritableRpcEngine implement
}
- /** Construct an RPC server.
+ /**
+ * Construct an RPC server.
* @param protocolClass - the protocol being registered
* can be null for compatibility with old usage (see below for details)
* @param protocolImpl the protocol impl that will be called
@@ -520,7 +358,7 @@ public class WritableRpcEngine implement
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+ super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager);
@@ -535,7 +373,7 @@ public class WritableRpcEngine implement
* the protocolImpl is derived from the protocolClass(es)
* we register all interfaces extended by the protocolImpl
*/
- protocols = getProtocolInterfaces(protocolImpl.getClass());
+ protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
} else {
if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
@@ -544,132 +382,125 @@ public class WritableRpcEngine implement
protocolImpl.getClass());
}
// register protocol class and its super interfaces
- registerProtocolAndImpl(protocolClass, protocolImpl);
- protocols = getProtocolInterfaces(protocolClass);
+ registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+ protocols = RPC.getProtocolInterfaces(protocolClass);
}
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
- registerProtocolAndImpl(p, protocolImpl);
+ registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
}
}
}
-
- @Override
- public <PROTO, IMPL extends PROTO> Server
- addProtocol(
- Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
- registerProtocolAndImpl(protocolClass, protocolImpl);
- return this;
+ private static void log(String value) {
+ if (value!= null && value.length() > 55)
+ value = value.substring(0, 55)+"...";
+ LOG.info(value);
}
- /**
- * Process a client call
- * @param protocolName - the protocol name (the class of the client proxy
- * used to make calls to the rpc server.
- * @param param parameters
- * @param receivedTime time at which the call receoved (for metrics)
- * @return the call's return
- * @throws IOException
- */
- public Writable call(String protocolName, Writable param, long receivedTime)
- throws IOException {
- try {
- Invocation call = (Invocation)param;
- if (verbose) log("Call: " + call);
-
- // Verify rpc version
- if (call.getRpcVersion() != writableRpcVersion) {
- // Client is using a different version of WritableRpc
- throw new IOException(
- "WritableRpc version mismatch, client side version="
- + call.getRpcVersion() + ", server side version="
- + writableRpcVersion);
- }
+ static class WritableRpcInvoker implements RpcInvoker {
+
+ @Override
+ public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+ String protocolName, Writable rpcRequest, long receivedTime)
+ throws IOException {
+ try {
+ Invocation call = (Invocation)rpcRequest;
+ if (server.verbose) log("Call: " + call);
- long clientVersion = call.getProtocolVersion();
- final String protoName;
- ProtoClassProtoImpl protocolImpl;
- if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
- // VersionProtocol methods are often used by client to figure out
- // which version of protocol to use.
- //
- // Versioned protocol methods should go the protocolName protocol
- // rather than the declaring class of the method since the
- // the declaring class is VersionedProtocol which is not
- // registered directly.
- // Send the call to the highest protocol version
- protocolImpl =
- getHighestSupportedProtocol(protocolName).protocolTarget;
- } else {
- protoName = call.declaringClassProtocolName;
-
- // Find the right impl for the protocol based on client version.
- ProtoNameVer pv =
- new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
- protocolImpl = protocolImplMap.get(pv);
- if (protocolImpl == null) { // no match for Protocol AND Version
- VerProtocolImpl highest =
- getHighestSupportedProtocol(protoName);
+ // Verify rpc version
+ if (call.getRpcVersion() != writableRpcVersion) {
+ // Client is using a different version of WritableRpc
+ throw new IOException(
+ "WritableRpc version mismatch, client side version="
+ + call.getRpcVersion() + ", server side version="
+ + writableRpcVersion);
+ }
+
+ long clientVersion = call.getProtocolVersion();
+ final String protoName;
+ ProtoClassProtoImpl protocolImpl;
+ if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+ // VersionProtocol methods are often used by client to figure out
+ // which version of protocol to use.
+ //
+ // Versioned protocol methods should go the protocolName protocol
+ // rather than the declaring class of the method since the
+ // the declaring class is VersionedProtocol which is not
+ // registered directly.
+ // Send the call to the highest protocol version
+ VerProtocolImpl highest = server.getHighestSupportedProtocol(
+ RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) {
- throw new IOException("Unknown protocol: " + protoName);
- } else { // protocol supported but not the version that client wants
- throw new RPC.VersionMismatch(protoName, clientVersion,
- highest.version);
+ throw new IOException("Unknown protocol: " + protocolName);
+ }
+ protocolImpl = highest.protocolTarget;
+ } else {
+ protoName = call.declaringClassProtocolName;
+
+ // Find the right impl for the protocol based on client version.
+ ProtoNameVer pv =
+ new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+ protocolImpl =
+ server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+ if (protocolImpl == null) { // no match for Protocol AND Version
+ VerProtocolImpl highest =
+ server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE,
+ protoName);
+ if (highest == null) {
+ throw new IOException("Unknown protocol: " + protoName);
+ } else { // protocol supported but not the version that client wants
+ throw new RPC.VersionMismatch(protoName, clientVersion,
+ highest.version);
+ }
}
}
- }
-
+
- // Invoke the protocol method
+ // Invoke the protocol method
- long startTime = System.currentTimeMillis();
- Method method =
- protocolImpl.protocolClass.getMethod(call.getMethodName(),
- call.getParameterClasses());
- method.setAccessible(true);
- rpcDetailedMetrics.init(protocolImpl.protocolClass);
- Object value =
- method.invoke(protocolImpl.protocolImpl, call.getParameters());
- int processingTime = (int) (System.currentTimeMillis() - startTime);
- int qTime = (int) (startTime-receivedTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Served: " + call.getMethodName() +
- " queueTime= " + qTime +
- " procesingTime= " + processingTime);
- }
- rpcMetrics.addRpcQueueTime(qTime);
- rpcMetrics.addRpcProcessingTime(processingTime);
- rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
- processingTime);
- if (verbose) log("Return: "+value);
-
- return new ObjectWritable(method.getReturnType(), value);
-
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- if (target instanceof IOException) {
- throw (IOException)target;
- } else {
- IOException ioe = new IOException(target.toString());
- ioe.setStackTrace(target.getStackTrace());
+ long startTime = System.currentTimeMillis();
+ Method method =
+ protocolImpl.protocolClass.getMethod(call.getMethodName(),
+ call.getParameterClasses());
+ method.setAccessible(true);
+ server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+ Object value =
+ method.invoke(protocolImpl.protocolImpl, call.getParameters());
+ int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int qTime = (int) (startTime-receivedTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Served: " + call.getMethodName() +
+ " queueTime= " + qTime +
+ " procesingTime= " + processingTime);
+ }
+ server.rpcMetrics.addRpcQueueTime(qTime);
+ server.rpcMetrics.addRpcProcessingTime(processingTime);
+ server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+ processingTime);
+ if (server.verbose) log("Return: "+value);
+
+ return new ObjectWritable(method.getReturnType(), value);
+
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof IOException) {
+ throw (IOException)target;
+ } else {
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ throw ioe;
+ }
+ } catch (Throwable e) {
+ if (!(e instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", e);
+ }
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
- } catch (Throwable e) {
- if (!(e instanceof IOException)) {
- LOG.error("Unexpected throwable object ", e);
- }
- IOException ioe = new IOException(e.toString());
- ioe.setStackTrace(e.getStackTrace());
- throw ioe;
}
}
}
-
- private static void log(String value) {
- if (value!= null && value.length() > 55)
- value = value.substring(0, 55)+"...";
- LOG.info(value);
- }
}