You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/01/07 18:59:31 UTC
svn commit: r1228685 - in
/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common:
./ src/main/java/ src/main/java/org/apache/hadoop/ipc/
src/test/java/org/apache/hadoop/ipc/
Author: szetszwo
Date: Sat Jan 7 17:59:30 2012
New Revision: 1228685
URL: http://svn.apache.org/viewvc?rev=1228685&view=rev
Log:
svn merge -c 1177399 from trunk for HADOOP-7693.
Modified:
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/ (props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jan 7 17:59:30 2012
@@ -1 +1 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1228685&r1=1228684&r2=1228685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt Sat Jan 7 17:59:30 2012
@@ -10,6 +10,9 @@ Release 0.23-PB - Unreleased
HADOOP-7687 Make getProtocolSignature public (sanjay)
+ HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol
+ interface introduced in HADOOP-7524. (cutting)
+
Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jan 7 17:59:30 2012
@@ -1,5 +1,5 @@
/hadoop/common/branches/yahoo-merge/CHANGES.txt:1079157,1079163-1079164,1079167
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
/hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278
Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jan 7 17:59:30 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177487,1177531,1177859,1177864,1182641,1183132,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1182641,1183132,1189932,1189982,1190109,1195575,1195760,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
/hadoop/core/branches/branch-0.19/core/src/java:713112
/hadoop/core/trunk/src/core:776175-785643,785929-786278
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1228685&r1=1228684&r2=1228685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java Sat Jan 7 17:59:30 2012
@@ -29,6 +29,8 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import javax.net.SocketFactory;
@@ -54,7 +56,7 @@ import org.apache.hadoop.security.token.
public class AvroRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
- private static int VERSION = 0;
+ private static int VERSION = 1;
// the implementation we tunnel through
private static final RpcEngine ENGINE = new WritableRpcEngine();
@@ -62,9 +64,10 @@ public class AvroRpcEngine implements Rp
/** Tunnel an Avro RPC request and response through Hadoop's RPC. */
private static interface TunnelProtocol extends VersionedProtocol {
//WritableRpcEngine expects a versionID in every protocol.
- public static final long versionID = 0L;
+ public static final long versionID = VERSION;
/** All Avro methods and responses go through this. */
- BufferListWritable call(BufferListWritable request) throws IOException;
+ BufferListWritable call(String protocol, BufferListWritable request)
+ throws IOException;
}
/** A Writable that holds a List<ByteBuffer>, The Avro RPC Transceiver's
@@ -103,23 +106,25 @@ public class AvroRpcEngine implements Rp
private static class ClientTransceiver extends Transceiver {
private TunnelProtocol tunnel;
private InetSocketAddress remote;
+ private String protocol;
public ClientTransceiver(InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
- int rpcTimeout)
+ int rpcTimeout, String protocol)
throws IOException {
this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION,
addr, ticket, conf, factory,
rpcTimeout).getProxy();
this.remote = addr;
+ this.protocol = protocol;
}
public String getRemoteName() { return remote.toString(); }
public List<ByteBuffer> transceive(List<ByteBuffer> request)
throws IOException {
- return tunnel.call(new BufferListWritable(request)).buffers;
+ return tunnel.call(protocol, new BufferListWritable(request)).buffers;
}
public List<ByteBuffer> readBuffers() throws IOException {
@@ -159,7 +164,8 @@ public class AvroRpcEngine implements Rp
UserGroupInformation ticket, Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
- this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
+ this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout,
+ protocol.getName());
this.requestor = createRequestor(protocol, tx);
}
@Override public Object invoke(Object proxy, Method method, Object[] args)
@@ -182,9 +188,11 @@ public class AvroRpcEngine implements Rp
/** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
private class TunnelResponder implements TunnelProtocol {
- private Responder responder;
- public TunnelResponder(Class<?> iface, Object impl) {
- responder = createResponder(iface, impl);
+ private Map<String, Responder> responders =
+ new HashMap<String, Responder>();
+
+ public void addProtocol(Class<?> iface, Object impl) {
+ responders.put(iface.getName(), createResponder(iface, impl));
}
@Override
@@ -197,13 +205,18 @@ public class AvroRpcEngine implements Rp
public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode)
throws IOException {
- return new ProtocolSignature(VERSION, null);
+ return ProtocolSignature.getProtocolSignature
+ (clientMethodsHashCode, VERSION, TunnelProtocol.class);
}
- public BufferListWritable call(final BufferListWritable request)
+ public BufferListWritable call(String protocol, BufferListWritable request)
throws IOException {
+ Responder responder = responders.get(protocol);
+ if (responder == null)
+ throw new IOException("No responder for: "+protocol);
return new BufferListWritable(responder.respond(request.buffers));
}
+
}
public Object[] call(Method method, Object[][] params,
@@ -212,6 +225,32 @@ public class AvroRpcEngine implements Rp
throw new UnsupportedOperationException();
}
+ private class Server extends WritableRpcEngine.Server {
+ private TunnelResponder responder = new TunnelResponder();
+
+ public Server(Class<?> iface, Object impl, String bindAddress,
+ int port, int numHandlers, int numReaders,
+ int queueSizePerHandler, boolean verbose,
+ Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager
+ ) throws IOException {
+ super((Class)null, new Object(), conf,
+ bindAddress, port, numHandlers, numReaders,
+ queueSizePerHandler, verbose, secretManager);
+ super.addProtocol(TunnelProtocol.class, responder);
+ responder.addProtocol(iface, impl);
+ }
+
+
+ @Override
+ public <PROTO, IMPL extends PROTO> Server
+ addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+ throws IOException {
+ responder.addProtocol(protocolClass, protocolImpl);
+ return this;
+ }
+ }
+
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
@@ -220,10 +259,9 @@ public class AvroRpcEngine implements Rp
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException {
- return ENGINE.getServer(TunnelProtocol.class,
- new TunnelResponder(iface, impl),
- bindAddress, port, numHandlers, numReaders,
- queueSizePerHandler, verbose, conf, secretManager);
+ return new Server
+ (iface, impl, bindAddress, port, numHandlers, numReaders,
+ queueSizePerHandler, verbose, conf, secretManager);
}
}
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1228685&r1=1228684&r2=1228685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Sat Jan 7 17:59:30 2012
@@ -605,7 +605,7 @@ public class RPC {
* @param protocolImpl - the impl of the protocol that will be called
* @return the server (for convenience)
*/
- public <PROTO extends VersionedProtocol, IMPL extends PROTO>
+ public <PROTO, IMPL extends PROTO>
Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
) throws IOException {
throw new IOException("addProtocol Not Implemented");
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1228685&r1=1228684&r2=1228685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Sat Jan 7 17:59:30 2012
@@ -555,7 +555,7 @@ public class WritableRpcEngine implement
@Override
- public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server
+ public <PROTO, IMPL extends PROTO> Server
addProtocol(
Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
registerProtocolAndImpl(protocolClass, protocolImpl);
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java?rev=1228685&r1=1228684&r2=1228685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java Sat Jan 7 17:59:30 2012
@@ -43,6 +43,7 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
/** Unit tests for AvroRpc. */
public class TestAvroRpc extends TestCase {
@@ -56,6 +57,9 @@ public class TestAvroRpc extends TestCas
public TestAvroRpc(String name) { super(name); }
+ public static interface EmptyProtocol {}
+ public static class EmptyImpl implements EmptyProtocol {}
+
public static class TestImpl implements AvroTestProtocol {
public void ping() {}
@@ -93,10 +97,12 @@ public class TestAvroRpc extends TestCas
sm = new TestTokenSecretManager();
}
UserGroupInformation.setConfiguration(conf);
+ RPC.setProtocolEngine(conf, EmptyProtocol.class, AvroRpcEngine.class);
RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
- Server server = RPC.getServer(AvroTestProtocol.class,
- new TestImpl(), ADDRESS, 0, 5, true,
- conf, sm);
+ RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
+ ADDRESS, 0, 5, true, conf, sm);
+ server.addProtocol(AvroTestProtocol.class, new TestImpl());
+
try {
server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server);