You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/02/27 05:54:41 UTC
svn commit: r1294028 [2/2] - in
/hadoop/common/branches/branch-0.23/hadoop-common-project: ./ hadoop-auth/
hadoop-common/ hadoop-common/dev-support/ hadoop-common/src/main/docs/
hadoop-common/src/main/java/ hadoop-common/src/main/java/org/apache/hadoop...
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java Mon Feb 27 04:54:33 2012
@@ -34,7 +34,6 @@ public interface VersionedProtocol {
* @return the version that the server will speak
* @throws IOException if any IO error occurs
*/
- @Deprecated
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException;
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon Feb 27 04:54:33 2012
@@ -18,23 +18,23 @@
package org.apache.hadoop.ipc;
-import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
-import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -49,8 +49,38 @@ public class WritableRpcEngine implement
//writableRpcVersion should be updated if there is a change
//in format of the rpc messages.
- public static long writableRpcVersion = 1L;
+
+ // 2L - added declared class to Invocation
+ public static final long writableRpcVersion = 2L;
+
+ /**
+ * Whether or not this class has been initialized.
+ */
+ private static boolean isInitialized = false;
+
+ static {
+ ensureInitialized();
+ }
+
+ /**
+ * Initialize this class if it isn't already.
+ */
+ public static synchronized void ensureInitialized() {
+ if (!isInitialized) {
+ initialize();
+ }
+ }
+
+ /**
+ * Register the rpcRequest deserializer for WritableRpcEngine
+ */
+ private static synchronized void initialize() {
+ org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+ Invocation.class, new Server.WritableRpcInvoker());
+ isInitialized = true;
+ }
+
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
private String methodName;
@@ -59,11 +89,13 @@ public class WritableRpcEngine implement
private Configuration conf;
private long clientVersion;
private int clientMethodsHash;
+ private String declaringClassProtocolName;
//This could be different from static writableRpcVersion when received
//at server, if client is using a different version.
private long rpcVersion;
+ @SuppressWarnings("unused") // called when deserializing an invocation
public Invocation() {}
public Invocation(Method method, Object[] parameters) {
@@ -76,18 +108,12 @@ public class WritableRpcEngine implement
clientVersion = 0;
clientMethodsHash = 0;
} else {
- try {
- Field versionField = method.getDeclaringClass().getField("versionID");
- versionField.setAccessible(true);
- this.clientVersion = versionField.getLong(method.getDeclaringClass());
- } catch (NoSuchFieldException ex) {
- throw new RuntimeException(ex);
- } catch (IllegalAccessException ex) {
- throw new RuntimeException(ex);
- }
+ this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
this.clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
}
+ this.declaringClassProtocolName =
+ RPC.getProtocolName(method.getDeclaringClass());
}
/** The name of the method invoked. */
@@ -103,6 +129,7 @@ public class WritableRpcEngine implement
return clientVersion;
}
+ @SuppressWarnings("unused")
private int getClientMethodsHash() {
return clientMethodsHash;
}
@@ -115,8 +142,10 @@ public class WritableRpcEngine implement
return rpcVersion;
}
+ @SuppressWarnings("deprecation")
public void readFields(DataInput in) throws IOException {
rpcVersion = in.readLong();
+ declaringClassProtocolName = UTF8.readString(in);
methodName = UTF8.readString(in);
clientVersion = in.readLong();
clientMethodsHash = in.readInt();
@@ -124,13 +153,16 @@ public class WritableRpcEngine implement
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
- parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+ parameters[i] =
+ ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
+ @SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion);
+ UTF8.writeString(out, declaringClassProtocolName);
UTF8.writeString(out, methodName);
out.writeLong(clientVersion);
out.writeInt(clientMethodsHash);
@@ -169,7 +201,7 @@ public class WritableRpcEngine implement
private static ClientCache CLIENTS=new ClientCache();
- private static class Invoker implements InvocationHandler {
+ private static class Invoker implements RpcInvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
@@ -191,7 +223,7 @@ public class WritableRpcEngine implement
}
ObjectWritable value = (ObjectWritable)
- client.call(new Invocation(method, args), remoteId);
+ client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -200,12 +232,17 @@ public class WritableRpcEngine implement
}
/* close the IPC client that's responsible for this invoker's RPCs */
- synchronized private void close() {
+ synchronized public void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
+
+ @Override
+ public ConnectionId getConnectionId() {
+ return remoteId;
+ }
}
// for unit testing only
@@ -231,15 +268,6 @@ public class WritableRpcEngine implement
factory, rpcTimeout));
return new ProtocolProxy<T>(protocol, proxy, true);
}
-
- /**
- * Stop this proxy and release its invoker's resource
- * @param proxy the proxy to be stopped
- */
- public void stopProxy(Object proxy) {
- ((Invoker)Proxy.getInvocationHandler(proxy)).close();
- }
-
/** Expert: Make multiple, parallel calls to a set of servers. */
public Object[] call(Method method, Object[][] params,
@@ -273,134 +301,238 @@ public class WritableRpcEngine implement
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
- public Server getServer(Class<?> protocol,
- Object instance, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, Configuration conf,
+ public RPC.Server getServer(Class<?> protocolClass,
+ Object protocolImpl, String bindAddress, int port,
+ int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- return new Server(instance, conf, bindAddress, port, numHandlers,
- numReaders, queueSizePerHandler, verbose, secretManager);
+ return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
}
+
/** An RPC Server. */
public static class Server extends RPC.Server {
- private Object instance;
- private boolean verbose;
-
- /** Construct an RPC server.
+ /**
+ * Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
+ *
+ * @deprecated Use #Server(Class, Object, Configuration, String, int)
+ */
+ @Deprecated
+ public Server(Object instance, Configuration conf, String bindAddress,
+ int port) throws IOException {
+ this(null, instance, conf, bindAddress, port);
+ }
+
+
+ /** Construct an RPC server.
+ * @param protocolClass class
+ * @param protocolImpl the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
*/
- public Server(Object instance, Configuration conf, String bindAddress, int port)
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port)
throws IOException {
- this(instance, conf, bindAddress, port, 1, -1, -1, false, null);
+ this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
+ false, null);
}
- private static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
- }
- return names[names.length-1];
+ /**
+ * Construct an RPC server.
+ * @param protocolImpl the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param verbose whether each call should be logged
+ *
+ * @deprecated use Server#Server(Class, Object,
+ * Configuration, String, int, int, int, int, boolean, SecretManager)
+ */
+ @Deprecated
+ public Server(Object protocolImpl, Configuration conf, String bindAddress,
+ int port, int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ this(null, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose,
+ secretManager);
+
}
- /** Construct an RPC server.
- * @param instance the instance whose methods will be called
+ /**
+ * Construct an RPC server.
+ * @param protocolClass - the protocol being registered
+ * can be null for compatibility with old usage (see below for details)
+ * @param protocolImpl the protocol impl that will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
- public Server(Object instance, Configuration conf, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose,
- SecretManager<? extends TokenIdentifier> secretManager)
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port,
+ int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+ super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
- classNameBase(instance.getClass().getName()), secretManager);
- this.instance = instance;
+ classNameBase(protocolImpl.getClass().getName()), secretManager);
+
this.verbose = verbose;
- }
+
+
+ Class<?>[] protocols;
+ if (protocolClass == null) { // derive protocol from impl
+ /*
+ * In order to remain compatible with the old usage where a single
+ * target protocolImpl is suppled for all protocol interfaces, and
+ * the protocolImpl is derived from the protocolClass(es)
+ * we register all interfaces extended by the protocolImpl
+ */
+ protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
- public Writable call(Class<?> protocol, Writable param, long receivedTime)
- throws IOException {
- try {
- Invocation call = (Invocation)param;
- if (verbose) log("Call: " + call);
-
- Method method = protocol.getMethod(call.getMethodName(),
- call.getParameterClasses());
- method.setAccessible(true);
-
- // Verify rpc version
- if (call.getRpcVersion() != writableRpcVersion) {
- // Client is using a different version of WritableRpc
- throw new IOException(
- "WritableRpc version mismatch, client side version="
- + call.getRpcVersion() + ", server side version="
- + writableRpcVersion);
+ } else {
+ if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
+ throw new IOException("protocolClass "+ protocolClass +
+ " is not implemented by protocolImpl which is of class " +
+ protocolImpl.getClass());
}
-
- //Verify protocol version.
- //Bypass the version check for VersionedProtocol
- if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ // register protocol class and its super interfaces
+ registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+ protocols = RPC.getProtocolInterfaces(protocolClass);
+ }
+ for (Class<?> p : protocols) {
+ if (!p.equals(VersionedProtocol.class)) {
+ registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
+ }
+ }
+
+ }
+
+ private static void log(String value) {
+ if (value!= null && value.length() > 55)
+ value = value.substring(0, 55)+"...";
+ LOG.info(value);
+ }
+
+ static class WritableRpcInvoker implements RpcInvoker {
+
+ @Override
+ public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+ String protocolName, Writable rpcRequest, long receivedTime)
+ throws IOException {
+ try {
+ Invocation call = (Invocation)rpcRequest;
+ if (server.verbose) log("Call: " + call);
+
+ // Verify rpc version
+ if (call.getRpcVersion() != writableRpcVersion) {
+ // Client is using a different version of WritableRpc
+ throw new IOException(
+ "WritableRpc version mismatch, client side version="
+ + call.getRpcVersion() + ", server side version="
+ + writableRpcVersion);
+ }
+
long clientVersion = call.getProtocolVersion();
- ProtocolSignature serverInfo = ((VersionedProtocol) instance)
- .getProtocolSignature(protocol.getCanonicalName(), call
- .getProtocolVersion(), call.getClientMethodsHash());
- long serverVersion = serverInfo.getVersion();
- if (serverVersion != clientVersion) {
- LOG.warn("Version mismatch: client version=" + clientVersion
- + ", server version=" + serverVersion);
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
+ final String protoName;
+ ProtoClassProtoImpl protocolImpl;
+ if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+ // VersionProtocol methods are often used by client to figure out
+ // which version of protocol to use.
+ //
+ // Versioned protocol methods should go the protocolName protocol
+ // rather than the declaring class of the method since the
+ // the declaring class is VersionedProtocol which is not
+ // registered directly.
+ // Send the call to the highest protocol version
+ VerProtocolImpl highest = server.getHighestSupportedProtocol(
+ RpcKind.RPC_WRITABLE, protocolName);
+ if (highest == null) {
+ throw new IOException("Unknown protocol: " + protocolName);
+ }
+ protocolImpl = highest.protocolTarget;
+ } else {
+ protoName = call.declaringClassProtocolName;
+
+ // Find the right impl for the protocol based on client version.
+ ProtoNameVer pv =
+ new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+ protocolImpl =
+ server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+ if (protocolImpl == null) { // no match for Protocol AND Version
+ VerProtocolImpl highest =
+ server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE,
+ protoName);
+ if (highest == null) {
+ throw new IOException("Unknown protocol: " + protoName);
+ } else { // protocol supported but not the version that client wants
+ throw new RPC.VersionMismatch(protoName, clientVersion,
+ highest.version);
+ }
+ }
}
- }
+
- long startTime = System.currentTimeMillis();
- Object value = method.invoke(instance, call.getParameters());
- int processingTime = (int) (System.currentTimeMillis() - startTime);
- int qTime = (int) (startTime-receivedTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Served: " + call.getMethodName() +
- " queueTime= " + qTime +
- " procesingTime= " + processingTime);
- }
- rpcMetrics.addRpcQueueTime(qTime);
- rpcMetrics.addRpcProcessingTime(processingTime);
- rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
- processingTime);
- if (verbose) log("Return: "+value);
-
- return new ObjectWritable(method.getReturnType(), value);
-
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- if (target instanceof IOException) {
- throw (IOException)target;
- } else {
- IOException ioe = new IOException(target.toString());
- ioe.setStackTrace(target.getStackTrace());
+ // Invoke the protocol method
+
+ long startTime = System.currentTimeMillis();
+ Method method =
+ protocolImpl.protocolClass.getMethod(call.getMethodName(),
+ call.getParameterClasses());
+ method.setAccessible(true);
+ server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+ Object value =
+ method.invoke(protocolImpl.protocolImpl, call.getParameters());
+ int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int qTime = (int) (startTime-receivedTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Served: " + call.getMethodName() +
+ " queueTime= " + qTime +
+ " procesingTime= " + processingTime);
+ }
+ server.rpcMetrics.addRpcQueueTime(qTime);
+ server.rpcMetrics.addRpcProcessingTime(processingTime);
+ server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+ processingTime);
+ if (server.verbose) log("Return: "+value);
+
+ return new ObjectWritable(method.getReturnType(), value);
+
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof IOException) {
+ throw (IOException)target;
+ } else {
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ throw ioe;
+ }
+ } catch (Throwable e) {
+ if (!(e instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", e);
+ }
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
- } catch (Throwable e) {
- if (!(e instanceof IOException)) {
- LOG.error("Unexpected throwable object ", e);
- }
- IOException ioe = new IOException(e.toString());
- ioe.setStackTrace(e.getStackTrace());
- throw ioe;
}
}
}
- private static void log(String value) {
- if (value!= null && value.length() > 55)
- value = value.substring(0, 55)+"...";
- LOG.info(value);
+ @Override
+ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+ ConnectionId connId, Configuration conf, SocketFactory factory)
+ throws IOException {
+ throw new UnsupportedOperationException("This proxy is not supported");
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java Mon Feb 27 04:54:33 2012
@@ -42,15 +42,20 @@ public class DelegationKey implements Wr
@Nullable
private byte[] keyBytes = null;
+ /** Default constructore required for Writable */
public DelegationKey() {
- this(0, 0L, null);
+ this(0, 0L, (SecretKey)null);
}
public DelegationKey(int keyId, long expiryDate, SecretKey key) {
+ this(keyId, expiryDate, key != null ? key.getEncoded() : null);
+ }
+
+ public DelegationKey(int keyId, long expiryDate, byte[] encodedKey) {
this.keyId = keyId;
this.expiryDate = expiryDate;
- if (key!=null) {
- this.keyBytes = key.getEncoded();
+ if (encodedKey != null) {
+ this.keyBytes = encodedKey;
}
}
@@ -70,6 +75,10 @@ public class DelegationKey implements Wr
return key;
}
}
+
+ public byte[] getEncodedKey() {
+ return keyBytes;
+ }
public void setExpiryDate(long expiryDate) {
this.expiryDate = expiryDate;
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java Mon Feb 27 04:54:33 2012
@@ -94,7 +94,7 @@ public abstract class GetGroupsBase exte
* @return A {@link GetUserMappingsProtocol} client proxy.
* @throws IOException
*/
- private GetUserMappingsProtocol getUgmProtocol() throws IOException {
+ protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
GetUserMappingsProtocol userGroupMappingProtocol =
RPC.getProxy(GetUserMappingsProtocol.class,
GetUserMappingsProtocol.versionID,
Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 27 04:54:33 2012
@@ -1,3 +1,4 @@
+/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/core:1227776-1294021
/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1166009,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1182189,1182205,1182214,1183132,1189613,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204370,1204376,1204388,1205260,1205697,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227964,1229347,1230398,1231569,1231572,1231627,1231640,1233605,1234555,1235135,1235137,1235956,1236456,1239752,1240897,1240928,1243065,1243104,1244766,1245751,1245762,1293419
/hadoop/core/branches/branch-0.19/core/src/test/core:713112
/hadoop/core/trunk/src/test/core:776175-785643,785929-786278
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Mon Feb 27 04:54:33 2012
@@ -57,6 +57,11 @@ public class TestFailoverProxy {
public Class<?> getInterface() {
return iface;
}
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to do.
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Feb 27 04:54:33 2012
@@ -23,6 +23,7 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.NetUtils;
@@ -96,8 +97,8 @@ public class TestIPC {
}
@Override
- public Writable call(Class<?> protocol, Writable param, long receiveTime)
- throws IOException {
+ public Writable call(RpcKind rpcKind, String protocol, Writable param,
+ long receiveTime) throws IOException {
if (sleep) {
// sleep a bit
try {
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Mon Feb 27 04:54:33 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils;
/**
@@ -72,8 +73,8 @@ public class TestIPCServerResponder exte
}
@Override
- public Writable call(Class<?> protocol, Writable param, long receiveTime)
- throws IOException {
+ public Writable call(RpcKind rpcKind, String protocol, Writable param,
+ long receiveTime) throws IOException {
if (sleep) {
try {
Thread.sleep(RANDOM.nextInt(20)); // sleep a bit
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Feb 27 04:54:33 2012
@@ -18,28 +18,39 @@
package org.apache.hadoop.ipc;
+import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Arrays;
-import junit.framework.TestCase;
+import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import static org.junit.Assert.*;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
@@ -49,18 +60,22 @@ import static org.apache.hadoop.test.Met
import static org.mockito.Mockito.*;
/** Unit tests for RPC. */
-public class TestRPC extends TestCase {
+@SuppressWarnings("deprecation")
+public class TestRPC {
private static final String ADDRESS = "0.0.0.0";
public static final Log LOG =
LogFactory.getLog(TestRPC.class);
private static Configuration conf = new Configuration();
+
+ static {
+ conf.setClass("rpc.engine." + StoppedProtocol.class.getName(),
+ StoppedRpcEngine.class, RpcEngine.class);
+ }
int datasize = 1024*100;
int numThreads = 50;
-
- public TestRPC(String name) { super(name); }
public interface TestProtocol extends VersionedProtocol {
public static final long versionID = 1L;
@@ -207,6 +222,80 @@ public class TestRPC extends TestCase {
}
}
+ /**
+ * A basic interface for testing client-side RPC resource cleanup.
+ */
+ private static interface StoppedProtocol {
+ long versionID = 0;
+
+ public void stop();
+ }
+
+ /**
+ * A class used for testing cleanup of client side RPC resources.
+ */
+ private static class StoppedRpcEngine implements RpcEngine {
+
+ @Override
+ public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
+ UserGroupInformation ticket, Configuration conf)
+ throws IOException, InterruptedException {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout) throws IOException {
+ T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+ new Class[] { protocol }, new StoppedInvocationHandler());
+ return new ProtocolProxy<T>(protocol, proxy, false);
+ }
+
+ @Override
+ public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
+ Object instance, String bindAddress, int port, int numHandlers,
+ int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+ ConnectionId connId, Configuration conf, SocketFactory factory)
+ throws IOException {
+ throw new UnsupportedOperationException("This proxy is not supported");
+ }
+ }
+
+ /**
+ * An invocation handler which does nothing when invoking methods, and just
+ * counts the number of times close() is called.
+ */
+ private static class StoppedInvocationHandler
+ implements InvocationHandler, Closeable {
+
+ private int closeCalled = 0;
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeCalled++;
+ }
+
+ public int getCloseCalled() {
+ return closeCalled;
+ }
+
+ }
+
+ @Test
public void testConfRpc() throws Exception {
Server server = RPC.getServer(TestProtocol.class,
new TestImpl(), ADDRESS, 0, 1, false, conf, null);
@@ -229,6 +318,7 @@ public class TestRPC extends TestCase {
server.stop();
}
+ @Test
public void testSlowRpc() throws Exception {
System.out.println("Testing Slow RPC");
// create a server with two handlers
@@ -273,11 +363,12 @@ public class TestRPC extends TestCase {
}
}
- public void testRPCConf(Configuration conf) throws Exception {
-
+ @Test
+ public void testCalls() throws Exception {
+ testCallsInternal(conf);
}
-
- public void testCalls(Configuration conf) throws Exception {
+
+ private void testCallsInternal(Configuration conf) throws Exception {
Server server = RPC.getServer(TestProtocol.class,
new TestImpl(), ADDRESS, 0, conf);
TestProtocol proxy = null;
@@ -384,6 +475,7 @@ public class TestRPC extends TestCase {
}
}
+ @Test
public void testStandaloneClient() throws IOException {
try {
TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
@@ -450,6 +542,7 @@ public class TestRPC extends TestCase {
}
}
+ @Test
public void testAuthorization() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
@@ -481,20 +574,48 @@ public class TestRPC extends TestCase {
Configuration conf = new Configuration();
conf.setBoolean("ipc.client.ping", false);
- new TestRPC("testnoPings").testCalls(conf);
+ new TestRPC().testCallsInternal(conf);
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
- new TestRPC("testnoPings").testCalls(conf);
+ new TestRPC().testCallsInternal(conf);
}
/**
* Test stopping a non-registered proxy
* @throws Exception
*/
+ @Test
public void testStopNonRegisteredProxy() throws Exception {
RPC.stopProxy(mock(TestProtocol.class));
}
+ @Test
+ public void testStopProxy() throws IOException {
+ StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+ StoppedProtocol.versionID, null, conf);
+ StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+ Proxy.getInvocationHandler(proxy);
+ assertEquals(invocationHandler.getCloseCalled(), 0);
+ RPC.stopProxy(proxy);
+ assertEquals(invocationHandler.getCloseCalled(), 1);
+ }
+
+ @Test
+ public void testWrappedStopProxy() throws IOException {
+ StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+ StoppedProtocol.versionID, null, conf);
+ StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+ Proxy.getInvocationHandler(wrappedProxy);
+
+ StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class,
+ wrappedProxy, RetryPolicies.RETRY_FOREVER);
+
+ assertEquals(invocationHandler.getCloseCalled(), 0);
+ RPC.stopProxy(proxy);
+ assertEquals(invocationHandler.getCloseCalled(), 1);
+ }
+
+ @Test
public void testErrorMsgForInsecureClient() throws Exception {
final Server server = RPC.getServer(TestProtocol.class,
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
@@ -567,10 +688,10 @@ public class TestRPC extends TestCase {
return count;
}
-
/**
* Test that server.stop() properly stops all threads
*/
+ @Test
public void testStopsAllThreads() throws Exception {
int threadsBefore = countThreads("Server$Listener$Reader");
assertEquals("Expect no Reader threads running before test",
@@ -591,8 +712,7 @@ public class TestRPC extends TestCase {
}
public static void main(String[] args) throws Exception {
-
- new TestRPC("test").testCalls(conf);
+ new TestRPC().testCallsInternal(conf);
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Mon Feb 27 04:54:33 2012
@@ -31,6 +31,10 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Test;
@@ -39,7 +43,7 @@ import org.junit.Test;
public class TestRPCCompatibility {
private static final String ADDRESS = "0.0.0.0";
private static InetSocketAddress addr;
- private static Server server;
+ private static RPC.Server server;
private ProtocolProxy<?> proxy;
public static final Log LOG =
@@ -52,10 +56,14 @@ public class TestRPCCompatibility {
void ping() throws IOException;
}
- public interface TestProtocol1 extends TestProtocol0 {
+ public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 {
String echo(String value) throws IOException;
}
+
+ // TestProtocol2 is a compatible impl of TestProtocol1 - hence use its name
+ @ProtocolInfo(protocolName=
+ "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
public interface TestProtocol2 extends TestProtocol1 {
int echo(int value) throws IOException;
}
@@ -89,28 +97,44 @@ public class TestRPCCompatibility {
public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
@Override
public String echo(String value) { return value; }
+ @Override
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ return TestProtocol1.versionID;
+ }
}
public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
@Override
public int echo(int value) { return value; }
+
+ @Override
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ return TestProtocol2.versionID;
+ }
+
}
@After
public void tearDown() throws IOException {
if (proxy != null) {
RPC.stopProxy(proxy.getProxy());
+ proxy = null;
}
if (server != null) {
server.stop();
+ server = null;
}
}
@Test // old client vs new server
public void testVersion0ClientVersion1Server() throws Exception {
// create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class,
- new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+ impl, ADDRESS, 0, 2, false, conf, null);
+ server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -154,8 +178,10 @@ public class TestRPCCompatibility {
public int echo(int value) throws IOException, NumberFormatException {
if (serverInfo.isMethodSupported("echo", int.class)) {
+System.out.println("echo int is supported");
return -value; // use version 3 echo long
} else { // server is version 2
+System.out.println("echo int is NOT supported");
return Integer.parseInt(proxy2.echo(String.valueOf(value)));
}
}
@@ -172,8 +198,10 @@ public class TestRPCCompatibility {
@Test // Compatible new client & old server
public void testVersion2ClientVersion1Server() throws Exception {
// create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class,
- new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+ impl, ADDRESS, 0, 2, false, conf, null);
+ server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -189,9 +217,12 @@ public class TestRPCCompatibility {
@Test // equal version client and server
public void testVersion2ClientVersion2Server() throws Exception {
+ ProtocolSignature.resetCache();
// create a server with two handlers
+ TestImpl2 impl = new TestImpl2();
server = RPC.getServer(TestProtocol2.class,
- new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
+ impl, ADDRESS, 0, 2, false, conf, null);
+ server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -250,14 +281,16 @@ public class TestRPCCompatibility {
assertEquals(hash1, hash2);
}
+ @ProtocolInfo(protocolName=
+ "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
public interface TestProtocol4 extends TestProtocol2 {
- public static final long versionID = 1L;
+ public static final long versionID = 4L;
int echo(int value) throws IOException;
}
@Test
public void testVersionMismatch() throws IOException {
- server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+ server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -268,7 +301,76 @@ public class TestRPCCompatibility {
proxy.echo(21);
fail("The call must throw VersionMismatch exception");
} catch (IOException ex) {
- Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+ Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(),
+ ex.getMessage().contains("VersionMismatch"));
+ }
+ }
+
+ @Test
+ public void testIsMethodSupported() throws IOException {
+ server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
+ false, conf, null);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
+ TestProtocol2.versionID, addr, conf);
+ boolean supported = RpcClientUtil.isMethodSupported(proxy,
+ TestProtocol2.class, RpcKind.RPC_WRITABLE,
+ RPC.getProtocolVersion(TestProtocol2.class), "echo");
+ Assert.assertTrue(supported);
+ supported = RpcClientUtil.isMethodSupported(proxy,
+ TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(TestProtocol2.class), "echo");
+ Assert.assertFalse(supported);
+ }
+
+ /**
+ * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
+ * the server registry to extract protocol signatures and versions.
+ */
+ @Test
+ public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
+ TestImpl1 impl = new TestImpl1();
+ server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
+ conf, null);
+ server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+
+ ProtocolMetaInfoServerSideTranslatorPB xlator =
+ new ProtocolMetaInfoServerSideTranslatorPB(server);
+
+ GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
+ null,
+ createGetProtocolSigRequestProto(TestProtocol1.class,
+ RpcKind.RPC_PROTOCOL_BUFFER));
+ //No signatures should be found
+ Assert.assertEquals(0, resp.getProtocolSignatureCount());
+ resp = xlator.getProtocolSignature(
+ null,
+ createGetProtocolSigRequestProto(TestProtocol1.class,
+ RpcKind.RPC_WRITABLE));
+ Assert.assertEquals(1, resp.getProtocolSignatureCount());
+ ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
+ Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
+ boolean found = false;
+ int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
+ .getMethod("echo", String.class));
+ for (int m : sig.getMethodsList()) {
+ if (expected == m) {
+ found = true;
+ break;
+ }
}
+ Assert.assertTrue(found);
+ }
+
+ private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
+ Class<?> protocol, RpcKind rpcKind) {
+ GetProtocolSignatureRequestProto.Builder builder =
+ GetProtocolSignatureRequestProto.newBuilder();
+ builder.setProtocol(protocol.getName());
+ builder.setRpcKind(rpcKind.toString());
+ return builder.build();
}
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java Mon Feb 27 04:54:33 2012
@@ -164,6 +164,10 @@ public abstract class MultithreadedTestU
}
checkException();
}
+
+ public Iterable<? extends Thread> getTestThreads() {
+ return testThreads;
+ }
}
/**