You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/01 19:55:36 UTC
[35/39] hadoop git commit: Revert "HADOOP-12579. Deprecate and remove
WriteableRPCEngine. Contributed by Kai Zheng"
Revert "HADOOP-12579. Deprecate and remove WriteableRPCEngine. Contributed by Kai Zheng"
This reverts commit a6c79f92d503c664f2d109355b719124f29a30e5.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/93d8a7f2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/93d8a7f2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/93d8a7f2
Branch: refs/heads/HDFS-1312
Commit: 93d8a7f2a2d72a1719d02b1ed90678397900b6ed
Parents: 4e1f56e
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Jun 1 08:41:15 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Wed Jun 1 08:41:15 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 5 +-
.../main/java/org/apache/hadoop/ipc/RPC.java | 21 +-
.../main/java/org/apache/hadoop/ipc/Server.java | 4 +-
.../apache/hadoop/ipc/WritableRpcEngine.java | 564 +++++++++++++++++++
.../hadoop/security/UserGroupInformation.java | 4 +-
.../java/org/apache/hadoop/util/ProtoUtil.java | 2 +
.../src/main/proto/RpcHeader.proto | 2 +-
.../org/apache/hadoop/ipc/RPCCallBenchmark.java | 38 +-
.../hadoop/ipc/TestMultipleProtocolServer.java | 236 +++++++-
.../apache/hadoop/ipc/TestRPCCallBenchmark.java | 13 +
.../apache/hadoop/ipc/TestRPCCompatibility.java | 242 +++++++-
.../apache/hadoop/ipc/TestRPCWaitForProxy.java | 37 +-
.../java/org/apache/hadoop/ipc/TestRpcBase.java | 50 +-
.../java/org/apache/hadoop/ipc/TestSaslRPC.java | 91 +--
.../hadoop/security/TestDoAsEffectiveUser.java | 291 ++++++----
.../security/TestUserGroupInformation.java | 28 +-
.../hadoop-common/src/test/proto/test.proto | 4 +-
.../src/test/proto/test_rpc_service.proto | 4 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 3 +
.../TestClientProtocolWithDelegationToken.java | 119 ++++
.../mapreduce/v2/hs/server/HSAdminServer.java | 3 +
21 files changed, 1448 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 7c11e22..0f43fc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -67,7 +67,7 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ThreadLocal<AsyncGet<Message, Exception>>
ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
- static { // Register the rpcRequest deserializer for ProtobufRpcEngine
+ static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
new Server.ProtoBufRpcInvoker());
@@ -201,8 +201,7 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (args.length != 2) { // RpcController + Message
- throw new ServiceException(
- "Too many or few parameters for request. Method: ["
+ throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index a544f2f..3f68d63 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.ipc;
-import java.io.IOException;
-import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
@@ -28,6 +26,7 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
+import java.io.*;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,12 +37,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.*;
+
import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
@@ -56,6 +54,7 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@@ -85,10 +84,10 @@ public class RPC {
final static int RPC_SERVICE_CLASS_DEFAULT = 0;
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
- // 2 for WritableRpcEngine, obsolete and removed
+ RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
- private final short value;
+ public final short value; //TODO make it private
RpcKind(short val) {
this.value = val;
@@ -208,7 +207,7 @@ public class RPC {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
- ProtobufRpcEngine.class);
+ WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
@@ -950,10 +949,10 @@ public class RPC {
return new VerProtocolImpl(highestVersion, highest);
}
- protected Server(String bindAddress, int port,
+ protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
int numReaders, int queueSizePerHandler,
- Configuration conf, String serverName,
+ Configuration conf, String serverName,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) throws IOException {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index be46e76..88c1f3c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -243,14 +243,14 @@ public abstract class Server {
static class RpcKindMapValue {
final Class<? extends Writable> rpcRequestWrapperClass;
final RpcInvoker rpcInvoker;
-
RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
RpcInvoker rpcInvoker) {
this.rpcInvoker = rpcInvoker;
this.rpcRequestWrapperClass = rpcRequestWrapperClass;
}
}
- static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new HashMap<>(4);
+ static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new
+ HashMap<RPC.RpcKind, RpcKindMapValue>(4);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
new file mode 100644
index 0000000..a9dbb41
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -0,0 +1,564 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
+
+import java.net.InetSocketAddress;
+import java.io.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.*;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+
+/** An RpcEngine implementation for Writable data. */
+@InterfaceStability.Evolving
+public class WritableRpcEngine implements RpcEngine {
+ private static final Log LOG = LogFactory.getLog(RPC.class);
+
+ //writableRpcVersion should be updated if there is a change
+ //in format of the rpc messages.
+
+ // 2L - added declared class to Invocation
+ public static final long writableRpcVersion = 2L;
+
+ /**
+ * Whether or not this class has been initialized.
+ */
+ private static boolean isInitialized = false;
+
+ static {
+ ensureInitialized();
+ }
+
+ /**
+ * Initialize this class if it isn't already.
+ */
+ public static synchronized void ensureInitialized() {
+ if (!isInitialized) {
+ initialize();
+ }
+ }
+
+ /**
+ * Register the rpcRequest deserializer for WritableRpcEngine
+ */
+ private static synchronized void initialize() {
+ org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE,
+ Invocation.class, new Server.WritableRpcInvoker());
+ isInitialized = true;
+ }
+
+
+ /** A method invocation, including the method name and its parameters.*/
+ private static class Invocation implements Writable, Configurable {
+ private String methodName;
+ private Class<?>[] parameterClasses;
+ private Object[] parameters;
+ private Configuration conf;
+ private long clientVersion;
+ private int clientMethodsHash;
+ private String declaringClassProtocolName;
+
+ //This could be different from static writableRpcVersion when received
+ //at server, if client is using a different version.
+ private long rpcVersion;
+
+ @SuppressWarnings("unused") // called when deserializing an invocation
+ public Invocation() {}
+
+ public Invocation(Method method, Object[] parameters) {
+ this.methodName = method.getName();
+ this.parameterClasses = method.getParameterTypes();
+ this.parameters = parameters;
+ rpcVersion = writableRpcVersion;
+ if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ //VersionedProtocol is exempted from version check.
+ clientVersion = 0;
+ clientMethodsHash = 0;
+ } else {
+ this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
+ this.clientMethodsHash = ProtocolSignature.getFingerprint(method
+ .getDeclaringClass().getMethods());
+ }
+ this.declaringClassProtocolName =
+ RPC.getProtocolName(method.getDeclaringClass());
+ }
+
+ /** The name of the method invoked. */
+ public String getMethodName() { return methodName; }
+
+ /** The parameter classes. */
+ public Class<?>[] getParameterClasses() { return parameterClasses; }
+
+ /** The parameter instances. */
+ public Object[] getParameters() { return parameters; }
+
+ private long getProtocolVersion() {
+ return clientVersion;
+ }
+
+ @SuppressWarnings("unused")
+ private int getClientMethodsHash() {
+ return clientMethodsHash;
+ }
+
+ /**
+ * Returns the rpc version used by the client.
+ * @return rpcVersion
+ */
+ public long getRpcVersion() {
+ return rpcVersion;
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void readFields(DataInput in) throws IOException {
+ rpcVersion = in.readLong();
+ declaringClassProtocolName = UTF8.readString(in);
+ methodName = UTF8.readString(in);
+ clientVersion = in.readLong();
+ clientMethodsHash = in.readInt();
+ parameters = new Object[in.readInt()];
+ parameterClasses = new Class[parameters.length];
+ ObjectWritable objectWritable = new ObjectWritable();
+ for (int i = 0; i < parameters.length; i++) {
+ parameters[i] =
+ ObjectWritable.readObject(in, objectWritable, this.conf);
+ parameterClasses[i] = objectWritable.getDeclaredClass();
+ }
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(rpcVersion);
+ UTF8.writeString(out, declaringClassProtocolName);
+ UTF8.writeString(out, methodName);
+ out.writeLong(clientVersion);
+ out.writeInt(clientMethodsHash);
+ out.writeInt(parameterClasses.length);
+ for (int i = 0; i < parameterClasses.length; i++) {
+ ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+ conf, true);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(methodName);
+ buffer.append("(");
+ for (int i = 0; i < parameters.length; i++) {
+ if (i != 0)
+ buffer.append(", ");
+ buffer.append(parameters[i]);
+ }
+ buffer.append(")");
+ buffer.append(", rpc version="+rpcVersion);
+ buffer.append(", client version="+clientVersion);
+ buffer.append(", methodsFingerPrint="+clientMethodsHash);
+ return buffer.toString();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ }
+
+ private static ClientCache CLIENTS=new ClientCache();
+
+ private static class Invoker implements RpcInvocationHandler {
+ private Client.ConnectionId remoteId;
+ private Client client;
+ private boolean isClosed = false;
+ private final AtomicBoolean fallbackToSimpleAuth;
+
+ public Invoker(Class<?> protocol,
+ InetSocketAddress address, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
+ this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
+ ticket, rpcTimeout, null, conf);
+ this.client = CLIENTS.getClient(conf, factory);
+ this.fallbackToSimpleAuth = fallbackToSimpleAuth;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ long startTime = 0;
+ if (LOG.isDebugEnabled()) {
+ startTime = Time.now();
+ }
+
+ // if Tracing is on then start a new span for this rpc.
+ // guard it in the if statement to make sure there isn't
+ // any extra string manipulation.
+ Tracer tracer = Tracer.curThreadTracer();
+ TraceScope traceScope = null;
+ if (tracer != null) {
+ traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
+ }
+ ObjectWritable value;
+ try {
+ value = (ObjectWritable)
+ client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
+ remoteId, fallbackToSimpleAuth);
+ } finally {
+ if (traceScope != null) traceScope.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ long callTime = Time.now() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
+ }
+ return value.get();
+ }
+
+ /* close the IPC client that's responsible for this invoker's RPCs */
+ @Override
+ synchronized public void close() {
+ if (!isClosed) {
+ isClosed = true;
+ CLIENTS.stopClient(client);
+ }
+ }
+
+ @Override
+ public ConnectionId getConnectionId() {
+ return remoteId;
+ }
+ }
+
+ // for unit testing only
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ static Client getClient(Configuration conf) {
+ return CLIENTS.getClient(conf);
+ }
+
+ /** Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ * @param <T>*/
+ @Override
+ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout, RetryPolicy connectionRetryPolicy)
+ throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+ rpcTimeout, connectionRetryPolicy, null);
+ }
+
+ /** Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ * @param <T>*/
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout, RetryPolicy connectionRetryPolicy,
+ AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
+
+ if (connectionRetryPolicy != null) {
+ throw new UnsupportedOperationException(
+ "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
+ }
+
+ T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+ new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
+ factory, rpcTimeout, fallbackToSimpleAuth));
+ return new ProtocolProxy<T>(protocol, proxy, true);
+ }
+
+ /* Construct a server for a protocol implementation instance listening on a
+ * port and address. */
+ @Override
+ public RPC.Server getServer(Class<?> protocolClass,
+ Object protocolImpl, String bindAddress, int port,
+ int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager,
+ String portRangeConfig)
+ throws IOException {
+ return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
+ portRangeConfig);
+ }
+
+
+ /** An RPC Server. */
+ public static class Server extends RPC.Server {
+ /**
+ * Construct an RPC server.
+ * @param instance the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ *
+ * @deprecated Use #Server(Class, Object, Configuration, String, int)
+ */
+ @Deprecated
+ public Server(Object instance, Configuration conf, String bindAddress,
+ int port) throws IOException {
+ this(null, instance, conf, bindAddress, port);
+ }
+
+
+ /** Construct an RPC server.
+ * @param protocolClass class
+ * @param protocolImpl the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ */
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port)
+ throws IOException {
+ this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
+ false, null, null);
+ }
+
+ /**
+ * Construct an RPC server.
+ * @param protocolImpl the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param verbose whether each call should be logged
+ *
+ * @deprecated use Server#Server(Class, Object,
+ * Configuration, String, int, int, int, int, boolean, SecretManager)
+ */
+ @Deprecated
+ public Server(Object protocolImpl, Configuration conf, String bindAddress,
+ int port, int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ this(null, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose,
+ secretManager, null);
+
+ }
+
+ /**
+ * Construct an RPC server.
+ * @param protocolClass - the protocol being registered
+ * can be null for compatibility with old usage (see below for details)
+ * @param protocolImpl the protocol impl that will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param verbose whether each call should be logged
+ */
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port,
+ int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
+ String portRangeConfig)
+ throws IOException {
+ super(bindAddress, port, null, numHandlers, numReaders,
+ queueSizePerHandler, conf,
+ classNameBase(protocolImpl.getClass().getName()), secretManager,
+ portRangeConfig);
+
+ this.verbose = verbose;
+
+
+ Class<?>[] protocols;
+ if (protocolClass == null) { // derive protocol from impl
+ /*
+ * In order to remain compatible with the old usage where a single
+ * target protocolImpl is suppled for all protocol interfaces, and
+ * the protocolImpl is derived from the protocolClass(es)
+ * we register all interfaces extended by the protocolImpl
+ */
+ protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
+
+ } else {
+ if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
+ throw new IOException("protocolClass "+ protocolClass +
+ " is not implemented by protocolImpl which is of class " +
+ protocolImpl.getClass());
+ }
+ // register protocol class and its super interfaces
+ registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+ protocols = RPC.getProtocolInterfaces(protocolClass);
+ }
+ for (Class<?> p : protocols) {
+ if (!p.equals(VersionedProtocol.class)) {
+ registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
+ }
+ }
+
+ }
+
+ private static void log(String value) {
+ if (value!= null && value.length() > 55)
+ value = value.substring(0, 55)+"...";
+ LOG.info(value);
+ }
+
+ static class WritableRpcInvoker implements RpcInvoker {
+
+ @Override
+ public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+ String protocolName, Writable rpcRequest, long receivedTime)
+ throws IOException, RPC.VersionMismatch {
+
+ Invocation call = (Invocation)rpcRequest;
+ if (server.verbose) log("Call: " + call);
+
+ // Verify writable rpc version
+ if (call.getRpcVersion() != writableRpcVersion) {
+ // Client is using a different version of WritableRpc
+ throw new RpcServerException(
+ "WritableRpc version mismatch, client side version="
+ + call.getRpcVersion() + ", server side version="
+ + writableRpcVersion);
+ }
+
+ long clientVersion = call.getProtocolVersion();
+ final String protoName;
+ ProtoClassProtoImpl protocolImpl;
+ if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+ // VersionProtocol methods are often used by client to figure out
+ // which version of protocol to use.
+ //
+ // Versioned protocol methods should go the protocolName protocol
+ // rather than the declaring class of the method since the
+ // the declaring class is VersionedProtocol which is not
+ // registered directly.
+ // Send the call to the highest protocol version
+ VerProtocolImpl highest = server.getHighestSupportedProtocol(
+ RPC.RpcKind.RPC_WRITABLE, protocolName);
+ if (highest == null) {
+ throw new RpcServerException("Unknown protocol: " + protocolName);
+ }
+ protocolImpl = highest.protocolTarget;
+ } else {
+ protoName = call.declaringClassProtocolName;
+
+ // Find the right impl for the protocol based on client version.
+ ProtoNameVer pv =
+ new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+ protocolImpl =
+ server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
+ if (protocolImpl == null) { // no match for Protocol AND Version
+ VerProtocolImpl highest =
+ server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
+ protoName);
+ if (highest == null) {
+ throw new RpcServerException("Unknown protocol: " + protoName);
+ } else { // protocol supported but not the version that client wants
+ throw new RPC.VersionMismatch(protoName, clientVersion,
+ highest.version);
+ }
+ }
+ }
+
+ // Invoke the protocol method
+ long startTime = Time.now();
+ int qTime = (int) (startTime-receivedTime);
+ Exception exception = null;
+ try {
+ Method method =
+ protocolImpl.protocolClass.getMethod(call.getMethodName(),
+ call.getParameterClasses());
+ method.setAccessible(true);
+ server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+ Object value =
+ method.invoke(protocolImpl.protocolImpl, call.getParameters());
+ if (server.verbose) log("Return: "+value);
+ return new ObjectWritable(method.getReturnType(), value);
+
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof IOException) {
+ exception = (IOException)target;
+ throw (IOException)target;
+ } else {
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ exception = ioe;
+ throw ioe;
+ }
+ } catch (Throwable e) {
+ if (!(e instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", e);
+ }
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
+ exception = ioe;
+ throw ioe;
+ } finally {
+ int processingTime = (int) (Time.now() - startTime);
+ if (LOG.isDebugEnabled()) {
+ String msg = "Served: " + call.getMethodName() +
+ " queueTime= " + qTime + " procesingTime= " + processingTime;
+ if (exception != null) {
+ msg += " exception= " + exception.getClass().getSimpleName();
+ }
+ LOG.debug(msg);
+ }
+ String detailedMetricsName = (exception == null) ?
+ call.getMethodName() :
+ exception.getClass().getSimpleName();
+ server.updateMetrics(detailedMetricsName, qTime, processingTime);
+ }
+ }
+ }
+ }
+
+ @Override
+ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
+ ConnectionId connId, Configuration conf, SocketFactory factory)
+ throws IOException {
+ throw new UnsupportedOperationException("This proxy is not supported");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
index aa334f3..798aa01 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -689,7 +689,7 @@ public class UserGroupInformation {
*
* @param user The principal name to load from the ticket
* cache
- * @param ticketCache the path to the ticket cache file
+ * @param ticketCachePath the path to the ticket cache file
*
* @throws IOException if the kerberos login fails
*/
@@ -749,7 +749,7 @@ public class UserGroupInformation {
/**
* Create a UserGroupInformation from a Subject with Kerberos principal.
*
- * @param subject The KerberosPrincipal to use in UGI
+ * @param user The KerberosPrincipal to use in UGI
*
* @throws IOException if the kerberos login fails
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
index 04e14e8..1a5acba 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
@@ -146,6 +146,7 @@ public abstract class ProtoUtil {
static RpcKindProto convert(RPC.RpcKind kind) {
switch (kind) {
case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN;
+ case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE;
case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER;
}
return null;
@@ -155,6 +156,7 @@ public abstract class ProtoUtil {
public static RPC.RpcKind convert( RpcKindProto kind) {
switch (kind) {
case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN;
+ case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE;
case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER;
}
return null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
index f1a36ae..aa14616 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
@@ -44,10 +44,10 @@ package hadoop.common;
/**
* RpcKind determine the rpcEngine and the serialization of the rpc request
- * Note: 1 for RPC_WRITABLE, WritableRpcEngine, obsolete and removed
*/
enum RpcKindProto {
RPC_BUILTIN = 0; // Used for built in calls by tests
+ RPC_WRITABLE = 1; // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
index 9356dab..eb7b949 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
@@ -17,8 +17,13 @@
*/
package org.apache.hadoop.ipc;
-import com.google.common.base.Joiner;
-import com.google.protobuf.BlockingService;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -29,6 +34,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
@@ -39,12 +45,8 @@ import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Joiner;
+import com.google.protobuf.BlockingService;
/**
* Benchmark for protobuf RPC.
@@ -66,7 +68,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
public int secondsToRun = 15;
private int msgSize = 1024;
public Class<? extends RpcEngine> rpcEngine =
- ProtobufRpcEngine.class;
+ WritableRpcEngine.class;
private MyOptions(String args[]) {
try {
@@ -133,7 +135,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
opts.addOption(
OptionBuilder.withLongOpt("engine").hasArg(true)
- .withArgName("protobuf")
+ .withArgName("writable|protobuf")
.withDescription("engine to use")
.create('e'));
@@ -182,6 +184,8 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
String eng = line.getOptionValue('e');
if ("protobuf".equals(eng)) {
rpcEngine = ProtobufRpcEngine.class;
+ } else if ("writable".equals(eng)) {
+ rpcEngine = WritableRpcEngine.class;
} else {
throw new ParseException("invalid engine: " + eng);
}
@@ -233,6 +237,11 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(opts.host).setPort(opts.getPort())
.setNumHandlers(opts.serverThreads).setVerbose(false).build();
+ } else if (opts.rpcEngine == WritableRpcEngine.class) {
+ server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host)
+ .setPort(opts.getPort()).setNumHandlers(opts.serverThreads)
+ .setVerbose(false).build();
} else {
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
}
@@ -390,6 +399,15 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
return responseProto.getMessage();
}
};
+ } else if (opts.rpcEngine == WritableRpcEngine.class) {
+ final TestProtocol proxy = RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
+ return new RpcServiceWrapper() {
+ @Override
+ public String doEcho(String msg) throws Exception {
+ return proxy.echo(msg);
+ }
+ };
} else {
throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
index 10e23ba..8b419e3 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
@@ -17,28 +17,252 @@
*/
package org.apache.hadoop.ipc;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.net.NetUtils;
import org.junit.Before;
+import org.junit.After;
import org.junit.Test;
+import com.google.protobuf.BlockingService;
public class TestMultipleProtocolServer extends TestRpcBase {
-
+ private static InetSocketAddress addr;
private static RPC.Server server;
- @Before
- public void setUp() throws Exception {
- super.setupConf();
+ private static Configuration conf = new Configuration();
+
+
+ @ProtocolInfo(protocolName="Foo")
+ interface Foo0 extends VersionedProtocol {
+ public static final long versionID = 0L;
+ String ping() throws IOException;
+
+ }
+
+ @ProtocolInfo(protocolName="Foo")
+ interface Foo1 extends VersionedProtocol {
+ public static final long versionID = 1L;
+ String ping() throws IOException;
+ String ping2() throws IOException;
+ }
+
+ @ProtocolInfo(protocolName="Foo")
+ interface FooUnimplemented extends VersionedProtocol {
+ public static final long versionID = 2L;
+ String ping() throws IOException;
+ }
+
+ interface Mixin extends VersionedProtocol{
+ public static final long versionID = 0L;
+ void hello() throws IOException;
+ }
+
+ interface Bar extends Mixin {
+ public static final long versionID = 0L;
+ int echo(int i) throws IOException;
+ }
+
+ class Foo0Impl implements Foo0 {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Foo0.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
- server = setupTestServer(conf, 2);
+ @Override
+ public String ping() {
+ return "Foo0";
+ }
+
}
+
+ class Foo1Impl implements Foo1 {
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Foo1.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public String ping() {
+ return "Foo1";
+ }
+
+ @Override
+ public String ping2() {
+ return "Foo1";
+
+ }
+
+ }
+
+
+ class BarImpl implements Bar {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Bar.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public int echo(int i) {
+ return i;
+ }
+
+ @Override
+ public void hello() {
+
+
+ }
+ }
+ @Before
+ public void setUp() throws Exception {
+ // create a server with two handlers
+ server = new RPC.Builder(conf).setProtocol(Foo0.class)
+ .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
+
+
+ // Add Protobuf server
+ // Create server side implementation
+ PBServerImpl pbServerImpl = new PBServerImpl();
+ BlockingService service = TestProtobufRpcProto
+ .newReflectiveBlockingService(pbServerImpl);
+ server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
+ service);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+ }
+
@After
public void tearDown() throws Exception {
server.stop();
}
+ @Test
+ public void test1() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
+ Foo0 foo0 = (Foo0)proxy.getProxy();
+ Assert.assertEquals("Foo0", foo0.ping());
+
+
+ proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
+
+
+ Foo1 foo1 = (Foo1)proxy.getProxy();
+ Assert.assertEquals("Foo1", foo1.ping());
+ Assert.assertEquals("Foo1", foo1.ping());
+
+
+ proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
+
+
+ Bar bar = (Bar)proxy.getProxy();
+ Assert.assertEquals(99, bar.echo(99));
+
+ // Now test Mixin class method
+
+ Mixin mixin = bar;
+ mixin.hello();
+ }
+
+
+ // Server does not implement the FooUnimplemented version of protocol Foo.
+ // See that calls to it fail.
+ @Test(expected=IOException.class)
+ public void testNonExistingProtocol() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(FooUnimplemented.class,
+ FooUnimplemented.versionID, addr, conf);
+
+ FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
+ foo.ping();
+ }
+
+ /**
+ * getProtocolVersion of an unimplemented version should return highest version
+ * Similarly getProtocolSignature should work.
+ * @throws IOException
+ */
+ @Test
+ public void testNonExistingProtocol2() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(FooUnimplemented.class,
+ FooUnimplemented.versionID, addr, conf);
+
+ FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
+ Assert.assertEquals(Foo1.versionID,
+ foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class),
+ FooUnimplemented.versionID));
+ foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class),
+ FooUnimplemented.versionID, 0);
+ }
+
+ @Test(expected=IOException.class)
+ public void testIncorrectServerCreation() throws IOException {
+ new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
+ .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
+ .build();
+ }
+
// Now test a PB service - a server hosts both PB and Writable Rpcs.
@Test
public void testPBService() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
index 6d83d7d..969f728 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
@@ -26,6 +26,19 @@ import org.junit.Test;
public class TestRPCCallBenchmark {
@Test(timeout=20000)
+ public void testBenchmarkWithWritable() throws Exception {
+ int rc = ToolRunner.run(new RPCCallBenchmark(),
+ new String[] {
+ "--clientThreads", "30",
+ "--serverThreads", "30",
+ "--time", "5",
+ "--serverReaderThreads", "4",
+ "--messageSize", "1024",
+ "--engine", "writable"});
+ assertEquals(0, rc);
+ }
+
+ @Test(timeout=20000)
public void testBenchmarkWithProto() throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(),
new String[] {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
index a06d9fd..2ac2be9 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
@@ -18,20 +18,28 @@
package org.apache.hadoop.ipc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
/** Unit test for supporting method-name based compatible RPCs. */
public class TestRPCCompatibility {
private static final String ADDRESS = "0.0.0.0";
@@ -41,7 +49,7 @@ public class TestRPCCompatibility {
public static final Log LOG =
LogFactory.getLog(TestRPCCompatibility.class);
-
+
private static Configuration conf = new Configuration();
public interface TestProtocol0 extends VersionedProtocol {
@@ -112,21 +120,6 @@ public class TestRPCCompatibility {
@Before
public void setUp() {
ProtocolSignature.resetCache();
-
- RPC.setProtocolEngine(conf,
- TestProtocol0.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol1.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol2.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol3.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol4.class, ProtobufRpcEngine.class);
}
@After
@@ -140,7 +133,117 @@ public class TestRPCCompatibility {
server = null;
}
}
+
+ @Test // old client vs new server
+ public void testVersion0ClientVersion1Server() throws Exception {
+ // create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ proxy = RPC.getProtocolProxy(
+ TestProtocol0.class, TestProtocol0.versionID, addr, conf);
+
+ TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy();
+ proxy0.ping();
+ }
+
+ @Test // old client vs new server
+ public void testVersion1ClientVersion0Server() throws Exception {
+ // create a server with two handlers
+ server = new RPC.Builder(conf).setProtocol(TestProtocol0.class)
+ .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ proxy = RPC.getProtocolProxy(
+ TestProtocol1.class, TestProtocol1.versionID, addr, conf);
+
+ TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy();
+ proxy1.ping();
+ try {
+ proxy1.echo("hello");
+ fail("Echo should fail");
+ } catch(IOException e) {
+ }
+ }
+
+ private class Version2Client {
+ private TestProtocol2 proxy2;
+ private ProtocolProxy<TestProtocol2> serverInfo;
+
+ private Version2Client() throws IOException {
+ serverInfo = RPC.getProtocolProxy(
+ TestProtocol2.class, TestProtocol2.versionID, addr, conf);
+ proxy2 = serverInfo.getProxy();
+ }
+
+ public int echo(int value) throws IOException, NumberFormatException {
+ if (serverInfo.isMethodSupported("echo", int.class)) {
+System.out.println("echo int is supported");
+ return -value; // use version 3 echo long
+ } else { // server is version 2
+System.out.println("echo int is NOT supported");
+ return Integer.parseInt(proxy2.echo(String.valueOf(value)));
+ }
+ }
+
+ public String echo(String value) throws IOException {
+ return proxy2.echo(value);
+ }
+
+ public void ping() throws IOException {
+ proxy2.ping();
+ }
+ }
+
+ @Test // Compatible new client & old server
+ public void testVersion2ClientVersion1Server() throws Exception {
+ // create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+
+ Version2Client client = new Version2Client();
+ client.ping();
+ assertEquals("hello", client.echo("hello"));
+
+ // echo(int) is not supported by server, so returning 3
+ // This verifies that echo(int) and echo(String)'s hash codes are different
+ assertEquals(3, client.echo(3));
+ }
+
+ @Test // equal version client and server
+ public void testVersion2ClientVersion2Server() throws Exception {
+ // create a server with two handlers
+ TestImpl2 impl = new TestImpl2();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ Version2Client client = new Version2Client();
+
+ client.ping();
+ assertEquals("hello", client.echo("hello"));
+
+ // now that echo(int) is supported by the server, echo(int) should return -3
+ assertEquals(-3, client.echo(3));
+ }
+
public interface TestProtocol3 {
int echo(String value);
int echo(int value);
@@ -194,4 +297,97 @@ public class TestRPCCompatibility {
@Override
int echo(int value) throws IOException;
}
+
+ @Test
+ public void testVersionMismatch() throws IOException {
+ server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+ .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
+ TestProtocol4.versionID, addr, conf);
+ try {
+ proxy.echo(21);
+ fail("The call must throw VersionMismatch exception");
+ } catch (RemoteException ex) {
+ Assert.assertEquals(RPC.VersionMismatch.class.getName(),
+ ex.getClassName());
+ Assert.assertTrue(ex.getErrorCode().equals(
+ RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH));
+ } catch (IOException ex) {
+ fail("Expected version mismatch but got " + ex);
+ }
+ }
+
+ @Test
+ public void testIsMethodSupported() throws IOException {
+ server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+ .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
+ TestProtocol2.versionID, addr, conf);
+ boolean supported = RpcClientUtil.isMethodSupported(proxy,
+ TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE,
+ RPC.getProtocolVersion(TestProtocol2.class), "echo");
+ Assert.assertTrue(supported);
+ supported = RpcClientUtil.isMethodSupported(proxy,
+ TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(TestProtocol2.class), "echo");
+ Assert.assertFalse(supported);
+ }
+
+ /**
+ * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
+ * the server registry to extract protocol signatures and versions.
+ */
+ @Test
+ public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
+ TestImpl1 impl = new TestImpl1();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+
+ ProtocolMetaInfoServerSideTranslatorPB xlator =
+ new ProtocolMetaInfoServerSideTranslatorPB(server);
+
+ GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
+ null,
+ createGetProtocolSigRequestProto(TestProtocol1.class,
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER));
+ //No signatures should be found
+ Assert.assertEquals(0, resp.getProtocolSignatureCount());
+ resp = xlator.getProtocolSignature(
+ null,
+ createGetProtocolSigRequestProto(TestProtocol1.class,
+ RPC.RpcKind.RPC_WRITABLE));
+ Assert.assertEquals(1, resp.getProtocolSignatureCount());
+ ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
+ Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
+ boolean found = false;
+ int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
+ .getMethod("echo", String.class));
+ for (int m : sig.getMethodsList()) {
+ if (expected == m) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue(found);
+ }
+
+ private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
+ Class<?> protocol, RPC.RpcKind rpcKind) {
+ GetProtocolSignatureRequestProto.Builder builder =
+ GetProtocolSignatureRequestProto.newBuilder();
+ builder.setProtocol(protocol.getName());
+ builder.setRpcKind(rpcKind.toString());
+ return builder.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
index b22f91b..5807998 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.ipc;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -28,13 +30,11 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
-
/**
* tests that the proxy can be interrupted
*/
-public class TestRPCWaitForProxy extends TestRpcBase {
+public class TestRPCWaitForProxy extends Assert {
+ private static final String ADDRESS = "0.0.0.0";
private static final Logger
LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
@@ -46,15 +46,14 @@ public class TestRPCWaitForProxy extends TestRpcBase {
*
* @throws Throwable any exception other than that which was expected
*/
- @Test(timeout = 50000)
+ @Test(timeout = 10000)
public void testWaitForProxy() throws Throwable {
RpcThread worker = new RpcThread(0);
worker.start();
worker.join();
Throwable caught = worker.getCaught();
- Throwable cause = caught.getCause();
- Assert.assertNotNull("No exception was raised", cause);
- if (!(cause instanceof ConnectException)) {
+ assertNotNull("No exception was raised", caught);
+ if (!(caught instanceof ConnectException)) {
throw caught;
}
}
@@ -70,11 +69,11 @@ public class TestRPCWaitForProxy extends TestRpcBase {
RpcThread worker = new RpcThread(100);
worker.start();
Thread.sleep(1000);
- Assert.assertTrue("worker hasn't started", worker.waitStarted);
+ assertTrue("worker hasn't started", worker.waitStarted);
worker.interrupt();
worker.join();
Throwable caught = worker.getCaught();
- Assert.assertNotNull("No exception was raised", caught);
+ assertNotNull("No exception was raised", caught);
// looking for the root cause here, which can be wrapped
// as part of the NetUtils work. Having this test look
// a the type of exception there would be brittle to improvements
@@ -83,8 +82,6 @@ public class TestRPCWaitForProxy extends TestRpcBase {
if (cause == null) {
// no inner cause, use outer exception as root cause.
cause = caught;
- } else if (cause.getCause() != null) {
- cause = cause.getCause();
}
if (!(cause instanceof InterruptedIOException)
&& !(cause instanceof ClosedByInterruptException)) {
@@ -115,16 +112,12 @@ public class TestRPCWaitForProxy extends TestRpcBase {
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
connectRetries);
waitStarted = true;
-
- short invalidPort = 20;
- InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS,
- invalidPort);
- TestRpcBase.TestRpcService proxy = RPC.getProxy(
- TestRpcBase.TestRpcService.class,
- 1L, invalidAddress, conf);
- // Test echo method
- proxy.echo(null, newEchoRequest("hello"));
-
+ TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
+ TestProtocol.versionID,
+ new InetSocketAddress(ADDRESS, 20),
+ config,
+ 15000L);
+ proxy.echo("");
} catch (Throwable throwable) {
caught = throwable;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 5a8f8d0..bc604a4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -112,8 +112,7 @@ public class TestRpcBase {
return setupTestServer(builder);
}
- protected static RPC.Server setupTestServer(
- RPC.Builder builder) throws IOException {
+ protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException {
RPC.Server server = builder.build();
server.start();
@@ -176,21 +175,17 @@ public class TestRpcBase {
public TestTokenIdentifier() {
this(new Text(), new Text());
}
-
public TestTokenIdentifier(Text tokenid) {
this(tokenid, new Text());
}
-
public TestTokenIdentifier(Text tokenid, Text realUser) {
this.tokenid = tokenid == null ? new Text() : tokenid;
this.realUser = realUser == null ? new Text() : realUser;
}
-
@Override
public Text getKind() {
return KIND_NAME;
}
-
@Override
public UserGroupInformation getUser() {
if (realUser.toString().isEmpty()) {
@@ -208,7 +203,6 @@ public class TestRpcBase {
tokenid.readFields(in);
realUser.readFields(in);
}
-
@Override
public void write(DataOutput out) throws IOException {
tokenid.write(out);
@@ -240,7 +234,7 @@ public class TestRpcBase {
@SuppressWarnings("unchecked")
@Override
public Token<TestTokenIdentifier> selectToken(Text service,
- Collection<Token<? extends TokenIdentifier>> tokens) {
+ Collection<Token<? extends TokenIdentifier>> tokens) {
if (service == null) {
return null;
}
@@ -394,17 +388,19 @@ public class TestRpcBase {
}
@Override
- public TestProtos.UserResponseProto getAuthUser(
+ public TestProtos.AuthUserResponseProto getAuthUser(
RpcController controller, TestProtos.EmptyRequestProto request)
throws ServiceException {
- UserGroupInformation authUser;
+ UserGroupInformation authUser = null;
try {
authUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new ServiceException(e);
}
- return newUserResponse(authUser.getUserName());
+ return TestProtos.AuthUserResponseProto.newBuilder()
+ .setAuthUser(authUser.getUserName())
+ .build();
}
@Override
@@ -436,34 +432,6 @@ public class TestRpcBase {
return TestProtos.EmptyResponseProto.newBuilder().build();
}
-
- @Override
- public TestProtos.UserResponseProto getCurrentUser(
- RpcController controller,
- TestProtos.EmptyRequestProto request) throws ServiceException {
- String user;
- try {
- user = UserGroupInformation.getCurrentUser().toString();
- } catch (IOException e) {
- throw new ServiceException("Failed to get current user", e);
- }
-
- return newUserResponse(user);
- }
-
- @Override
- public TestProtos.UserResponseProto getServerRemoteUser(
- RpcController controller,
- TestProtos.EmptyRequestProto request) throws ServiceException {
- String serverRemoteUser = Server.getRemoteUser().toString();
- return newUserResponse(serverRemoteUser);
- }
-
- private TestProtos.UserResponseProto newUserResponse(String user) {
- return TestProtos.UserResponseProto.newBuilder()
- .setUser(user)
- .build();
- }
}
protected static TestProtos.EmptyRequestProto newEmptyRequest() {
@@ -510,4 +478,8 @@ public class TestRpcBase {
}
return null;
}
+
+ protected static String convert(TestProtos.AuthUserResponseProto response) {
+ return response.getAuthUser();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d8a7f2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
index 3809448..ec53e8c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
@@ -29,25 +29,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.SaslInputStream;
-import org.apache.hadoop.security.SaslPlainServer;
-import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security.SaslRpcClient;
-import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.*;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
-import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.TestUserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.*;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.hadoop.security.token.TokenSelector;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
@@ -57,55 +44,30 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import javax.security.auth.callback.*;
+import javax.security.sasl.*;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.security.Security;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*;
+import static org.junit.Assert.*;
/** Unit tests for using Sasl over RPC. */
@RunWith(Parameterized.class)
public class TestSaslRPC extends TestRpcBase {
@Parameters
public static Collection<Object[]> data() {
- Collection<Object[]> params = new ArrayList<>();
+ Collection<Object[]> params = new ArrayList<Object[]>();
for (QualityOfProtection qop : QualityOfProtection.values()) {
params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null });
}
@@ -151,7 +113,7 @@ public class TestSaslRPC extends TestRpcBase {
NONE(),
VALID(),
INVALID(),
- OTHER()
+ OTHER();
}
@BeforeClass
@@ -267,7 +229,7 @@ public class TestSaslRPC extends TestRpcBase {
final Server server = setupTestServer(conf, 5, sm);
doDigestRpc(server, sm);
} finally {
- SecurityUtil.setSecurityInfoProviders();
+ SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
}
}
@@ -296,7 +258,7 @@ public class TestSaslRPC extends TestRpcBase {
addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()));
- Token<TestTokenIdentifier> token = new Token<>(tokenId, sm);
+ Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId, sm);
SecurityUtil.setTokenService(token, addr);
current.addToken(token);
@@ -324,8 +286,8 @@ public class TestSaslRPC extends TestRpcBase {
// set doPing to true
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
- ConnectionId remoteId = ConnectionId.getConnectionId(
- new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf);
+ ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0),
+ TestRpcService.class, null, 0, null, newConf);
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
remoteId.getPingInterval());
// set doPing to false
@@ -834,13 +796,13 @@ public class TestSaslRPC extends TestRpcBase {
final TestTokenSecretManager sm = new TestTokenSecretManager();
boolean useSecretManager = (serverAuth != SIMPLE);
if (enableSecretManager != null) {
- useSecretManager &= enableSecretManager;
+ useSecretManager &= enableSecretManager.booleanValue();
}
if (forceSecretManager != null) {
- useSecretManager |= forceSecretManager;
+ useSecretManager |= forceSecretManager.booleanValue();
}
final SecretManager<?> serverSm = useSecretManager ? sm : null;
-
+
Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
@Override
public Server run() throws IOException {
@@ -895,13 +857,13 @@ public class TestSaslRPC extends TestRpcBase {
proxy.ping(null, newEmptyRequest());
// make sure the other side thinks we are who we said we are!!!
assertEquals(clientUgi.getUserName(),
- proxy.getAuthUser(null, newEmptyRequest()).getUser());
+ convert(proxy.getAuthUser(null, newEmptyRequest())));
AuthMethod authMethod =
convert(proxy.getAuthMethod(null, newEmptyRequest()));
// verify sasl completed with correct QOP
assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
- RPC.getConnectionIdForProxy(proxy).getSaslQop());
- return authMethod != null ? authMethod.toString() : null;
+ RPC.getConnectionIdForProxy(proxy).getSaslQop());
+ return authMethod.toString();
} catch (ServiceException se) {
if (se.getCause() instanceof RemoteException) {
throw (RemoteException) se.getCause();
@@ -926,18 +888,21 @@ public class TestSaslRPC extends TestRpcBase {
String actual) {
assertEquals(expect.toString(), actual);
}
-
- private static void assertAuthEquals(Pattern expect, String actual) {
+
+ private static void assertAuthEquals(Pattern expect,
+ String actual) {
// this allows us to see the regexp and the value it didn't match
if (!expect.matcher(actual).matches()) {
- fail(); // it failed
+ assertEquals(expect, actual); // it failed
+ } else {
+ assertTrue(true); // it matched
}
}
/*
* Class used to test overriding QOP values using SaslPropertiesResolver
*/
- static class AuthSaslPropertiesResolver extends SaslPropertiesResolver {
+ static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{
@Override
public Map<String, String> getServerProperties(InetAddress address) {
@@ -946,7 +911,7 @@ public class TestSaslRPC extends TestRpcBase {
return newPropertes;
}
}
-
+
public static void main(String[] args) throws Exception {
System.out.println("Testing Kerberos authentication over RPC");
if (args.length != 2) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org