You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/09/07 21:53:45 UTC
hadoop git commit: Revert "HADOOP-13218. Migrate other Hadoop side
tests to prepare for removing WritableRPCEngine. Contributed by Wei Zhou and
Kai Zheng"
Repository: hadoop
Updated Branches:
refs/heads/trunk f414d5e11 -> d355573f5
Revert "HADOOP-13218. Migrate other Hadoop side tests to prepare for removing WritableRPCEngine. Contributed by Wei Zhou and Kai Zheng"
This reverts commit 62a9667136ebd8a048f556b534fcff4fdaf8e2ec
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d355573f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d355573f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d355573f
Branch: refs/heads/trunk
Commit: d355573f5681f43e760a1bc23ebed553bd35fca5
Parents: f414d5e
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Sep 8 05:50:17 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Thu Sep 8 05:50:17 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 5 +-
.../main/java/org/apache/hadoop/ipc/RPC.java | 15 +-
.../main/java/org/apache/hadoop/ipc/Server.java | 4 +-
.../hadoop/security/UserGroupInformation.java | 4 +-
.../org/apache/hadoop/ipc/RPCCallBenchmark.java | 38 ++-
.../hadoop/ipc/TestMultipleProtocolServer.java | 236 ++++++++++++++-
.../apache/hadoop/ipc/TestRPCCallBenchmark.java | 13 +
.../apache/hadoop/ipc/TestRPCCompatibility.java | 242 +++++++++++++--
.../apache/hadoop/ipc/TestRPCWaitForProxy.java | 37 +--
.../java/org/apache/hadoop/ipc/TestRpcBase.java | 50 +---
.../java/org/apache/hadoop/ipc/TestSaslRPC.java | 74 ++---
.../hadoop/security/TestDoAsEffectiveUser.java | 291 ++++++++++++-------
.../security/TestUserGroupInformation.java | 28 +-
.../hadoop-common/src/test/proto/test.proto | 4 +-
.../src/test/proto/test_rpc_service.proto | 4 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 3 +
.../TestClientProtocolWithDelegationToken.java | 119 ++++++++
.../mapreduce/v2/hs/server/HSAdminServer.java | 3 +
18 files changed, 876 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index e68bfd4..83e4b9e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -60,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ThreadLocal<AsyncGet<Message, Exception>>
ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
- static { // Register the rpcRequest deserializer for ProtobufRpcEngine
+ static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
@@ -194,8 +194,7 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (args.length != 2) { // RpcController + Message
- throw new ServiceException(
- "Too many or few parameters for request. Method: ["
+ throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 12a07a5..3f68d63 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.ipc;
-import java.io.IOException;
-import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
@@ -28,6 +26,7 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
+import java.io.*;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,12 +37,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.*;
+
import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
@@ -56,6 +54,7 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@@ -88,7 +87,7 @@ public class RPC {
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
- private final short value;
+ public final short value; //TODO make it private
RpcKind(short val) {
this.value = val;
@@ -208,7 +207,7 @@ public class RPC {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
- ProtobufRpcEngine.class);
+ WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 531d574..f20ba94 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -237,14 +237,14 @@ public abstract class Server {
static class RpcKindMapValue {
final Class<? extends Writable> rpcRequestWrapperClass;
final RpcInvoker rpcInvoker;
-
RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
RpcInvoker rpcInvoker) {
this.rpcInvoker = rpcInvoker;
this.rpcRequestWrapperClass = rpcRequestWrapperClass;
}
}
- static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new HashMap<>(4);
+ static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new
+ HashMap<RPC.RpcKind, RpcKindMapValue>(4);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
index ed3a9d0..0ad9abc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -730,7 +730,7 @@ public class UserGroupInformation {
*
* @param user The principal name to load from the ticket
* cache
- * @param ticketCache the path to the ticket cache file
+ * @param ticketCachePath the path to the ticket cache file
*
* @throws IOException if the kerberos login fails
*/
@@ -790,7 +790,7 @@ public class UserGroupInformation {
/**
* Create a UserGroupInformation from a Subject with Kerberos principal.
*
- * @param subject The KerberosPrincipal to use in UGI
+ * @param user The KerberosPrincipal to use in UGI
*
* @throws IOException if the kerberos login fails
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
index 9356dab..eb7b949 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
@@ -17,8 +17,13 @@
*/
package org.apache.hadoop.ipc;
-import com.google.common.base.Joiner;
-import com.google.protobuf.BlockingService;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -29,6 +34,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
@@ -39,12 +45,8 @@ import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Joiner;
+import com.google.protobuf.BlockingService;
/**
* Benchmark for protobuf RPC.
@@ -66,7 +68,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
public int secondsToRun = 15;
private int msgSize = 1024;
public Class<? extends RpcEngine> rpcEngine =
- ProtobufRpcEngine.class;
+ WritableRpcEngine.class;
private MyOptions(String args[]) {
try {
@@ -133,7 +135,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
opts.addOption(
OptionBuilder.withLongOpt("engine").hasArg(true)
- .withArgName("protobuf")
+ .withArgName("writable|protobuf")
.withDescription("engine to use")
.create('e'));
@@ -182,6 +184,8 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
String eng = line.getOptionValue('e');
if ("protobuf".equals(eng)) {
rpcEngine = ProtobufRpcEngine.class;
+ } else if ("writable".equals(eng)) {
+ rpcEngine = WritableRpcEngine.class;
} else {
throw new ParseException("invalid engine: " + eng);
}
@@ -233,6 +237,11 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(opts.host).setPort(opts.getPort())
.setNumHandlers(opts.serverThreads).setVerbose(false).build();
+ } else if (opts.rpcEngine == WritableRpcEngine.class) {
+ server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host)
+ .setPort(opts.getPort()).setNumHandlers(opts.serverThreads)
+ .setVerbose(false).build();
} else {
throw new RuntimeException("Bad engine: " + opts.rpcEngine);
}
@@ -390,6 +399,15 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
return responseProto.getMessage();
}
};
+ } else if (opts.rpcEngine == WritableRpcEngine.class) {
+ final TestProtocol proxy = RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
+ return new RpcServiceWrapper() {
+ @Override
+ public String doEcho(String msg) throws Exception {
+ return proxy.echo(msg);
+ }
+ };
} else {
throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
index 10e23ba..8b419e3 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
@@ -17,28 +17,252 @@
*/
package org.apache.hadoop.ipc;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.net.NetUtils;
import org.junit.Before;
+import org.junit.After;
import org.junit.Test;
+import com.google.protobuf.BlockingService;
public class TestMultipleProtocolServer extends TestRpcBase {
-
+ private static InetSocketAddress addr;
private static RPC.Server server;
- @Before
- public void setUp() throws Exception {
- super.setupConf();
+ private static Configuration conf = new Configuration();
+
+
+ @ProtocolInfo(protocolName="Foo")
+ interface Foo0 extends VersionedProtocol {
+ public static final long versionID = 0L;
+ String ping() throws IOException;
+
+ }
+
+ @ProtocolInfo(protocolName="Foo")
+ interface Foo1 extends VersionedProtocol {
+ public static final long versionID = 1L;
+ String ping() throws IOException;
+ String ping2() throws IOException;
+ }
+
+ @ProtocolInfo(protocolName="Foo")
+ interface FooUnimplemented extends VersionedProtocol {
+ public static final long versionID = 2L;
+ String ping() throws IOException;
+ }
+
+ interface Mixin extends VersionedProtocol{
+ public static final long versionID = 0L;
+ void hello() throws IOException;
+ }
+
+ interface Bar extends Mixin {
+ public static final long versionID = 0L;
+ int echo(int i) throws IOException;
+ }
+
+ class Foo0Impl implements Foo0 {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Foo0.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
- server = setupTestServer(conf, 2);
+ @Override
+ public String ping() {
+ return "Foo0";
+ }
+
}
+
+ class Foo1Impl implements Foo1 {
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Foo1.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public String ping() {
+ return "Foo1";
+ }
+
+ @Override
+ public String ping2() {
+ return "Foo1";
+
+ }
+
+ }
+
+
+ class BarImpl implements Bar {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Bar.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public int echo(int i) {
+ return i;
+ }
+
+ @Override
+ public void hello() {
+
+
+ }
+ }
+ @Before
+ public void setUp() throws Exception {
+ // create a server with two handlers
+ server = new RPC.Builder(conf).setProtocol(Foo0.class)
+ .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
+
+
+ // Add Protobuf server
+ // Create server side implementation
+ PBServerImpl pbServerImpl = new PBServerImpl();
+ BlockingService service = TestProtobufRpcProto
+ .newReflectiveBlockingService(pbServerImpl);
+ server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
+ service);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+ }
+
@After
public void tearDown() throws Exception {
server.stop();
}
+ @Test
+ public void test1() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
+ Foo0 foo0 = (Foo0)proxy.getProxy();
+ Assert.assertEquals("Foo0", foo0.ping());
+
+
+ proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
+
+
+ Foo1 foo1 = (Foo1)proxy.getProxy();
+ Assert.assertEquals("Foo1", foo1.ping());
+ Assert.assertEquals("Foo1", foo1.ping());
+
+
+ proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
+
+
+ Bar bar = (Bar)proxy.getProxy();
+ Assert.assertEquals(99, bar.echo(99));
+
+ // Now test Mixin class method
+
+ Mixin mixin = bar;
+ mixin.hello();
+ }
+
+
+ // Server does not implement the FooUnimplemented version of protocol Foo.
+ // See that calls to it fail.
+ @Test(expected=IOException.class)
+ public void testNonExistingProtocol() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(FooUnimplemented.class,
+ FooUnimplemented.versionID, addr, conf);
+
+ FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
+ foo.ping();
+ }
+
+ /**
+ * getProtocolVersion of an unimplemented version should return highest version
+ * Similarly getProtocolSignature should work.
+ * @throws IOException
+ */
+ @Test
+ public void testNonExistingProtocol2() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(FooUnimplemented.class,
+ FooUnimplemented.versionID, addr, conf);
+
+ FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
+ Assert.assertEquals(Foo1.versionID,
+ foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class),
+ FooUnimplemented.versionID));
+ foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class),
+ FooUnimplemented.versionID, 0);
+ }
+
+ @Test(expected=IOException.class)
+ public void testIncorrectServerCreation() throws IOException {
+ new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
+ .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
+ .build();
+ }
+
// Now test a PB service - a server hosts both PB and Writable Rpcs.
@Test
public void testPBService() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
index 6d83d7d..969f728 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
@@ -26,6 +26,19 @@ import org.junit.Test;
public class TestRPCCallBenchmark {
@Test(timeout=20000)
+ public void testBenchmarkWithWritable() throws Exception {
+ int rc = ToolRunner.run(new RPCCallBenchmark(),
+ new String[] {
+ "--clientThreads", "30",
+ "--serverThreads", "30",
+ "--time", "5",
+ "--serverReaderThreads", "4",
+ "--messageSize", "1024",
+ "--engine", "writable"});
+ assertEquals(0, rc);
+ }
+
+ @Test(timeout=20000)
public void testBenchmarkWithProto() throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(),
new String[] {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
index a06d9fd..2ac2be9 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
@@ -18,20 +18,28 @@
package org.apache.hadoop.ipc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
/** Unit test for supporting method-name based compatible RPCs. */
public class TestRPCCompatibility {
private static final String ADDRESS = "0.0.0.0";
@@ -41,7 +49,7 @@ public class TestRPCCompatibility {
public static final Log LOG =
LogFactory.getLog(TestRPCCompatibility.class);
-
+
private static Configuration conf = new Configuration();
public interface TestProtocol0 extends VersionedProtocol {
@@ -112,21 +120,6 @@ public class TestRPCCompatibility {
@Before
public void setUp() {
ProtocolSignature.resetCache();
-
- RPC.setProtocolEngine(conf,
- TestProtocol0.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol1.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol2.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol3.class, ProtobufRpcEngine.class);
-
- RPC.setProtocolEngine(conf,
- TestProtocol4.class, ProtobufRpcEngine.class);
}
@After
@@ -140,7 +133,117 @@ public class TestRPCCompatibility {
server = null;
}
}
+
+ @Test // old client vs new server
+ public void testVersion0ClientVersion1Server() throws Exception {
+ // create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ proxy = RPC.getProtocolProxy(
+ TestProtocol0.class, TestProtocol0.versionID, addr, conf);
+
+ TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy();
+ proxy0.ping();
+ }
+
+ @Test // old client vs new server
+ public void testVersion1ClientVersion0Server() throws Exception {
+ // create a server with two handlers
+ server = new RPC.Builder(conf).setProtocol(TestProtocol0.class)
+ .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ proxy = RPC.getProtocolProxy(
+ TestProtocol1.class, TestProtocol1.versionID, addr, conf);
+
+ TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy();
+ proxy1.ping();
+ try {
+ proxy1.echo("hello");
+ fail("Echo should fail");
+ } catch(IOException e) {
+ }
+ }
+
+ private class Version2Client {
+ private TestProtocol2 proxy2;
+ private ProtocolProxy<TestProtocol2> serverInfo;
+
+ private Version2Client() throws IOException {
+ serverInfo = RPC.getProtocolProxy(
+ TestProtocol2.class, TestProtocol2.versionID, addr, conf);
+ proxy2 = serverInfo.getProxy();
+ }
+
+ public int echo(int value) throws IOException, NumberFormatException {
+ if (serverInfo.isMethodSupported("echo", int.class)) {
+System.out.println("echo int is supported");
+ return -value; // use version 3 echo long
+ } else { // server is version 2
+System.out.println("echo int is NOT supported");
+ return Integer.parseInt(proxy2.echo(String.valueOf(value)));
+ }
+ }
+
+ public String echo(String value) throws IOException {
+ return proxy2.echo(value);
+ }
+
+ public void ping() throws IOException {
+ proxy2.ping();
+ }
+ }
+
+ @Test // Compatible new client & old server
+ public void testVersion2ClientVersion1Server() throws Exception {
+ // create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+
+ Version2Client client = new Version2Client();
+ client.ping();
+ assertEquals("hello", client.echo("hello"));
+
+ // echo(int) is not supported by server, so returning 3
+ // This verifies that echo(int) and echo(String)'s hash codes are different
+ assertEquals(3, client.echo(3));
+ }
+
+ @Test // equal version client and server
+ public void testVersion2ClientVersion2Server() throws Exception {
+ // create a server with two handlers
+ TestImpl2 impl = new TestImpl2();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ Version2Client client = new Version2Client();
+
+ client.ping();
+ assertEquals("hello", client.echo("hello"));
+
+ // now that echo(int) is supported by the server, echo(int) should return -3
+ assertEquals(-3, client.echo(3));
+ }
+
public interface TestProtocol3 {
int echo(String value);
int echo(int value);
@@ -194,4 +297,97 @@ public class TestRPCCompatibility {
@Override
int echo(int value) throws IOException;
}
+
+ @Test
+ public void testVersionMismatch() throws IOException {
+ server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+ .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
+ TestProtocol4.versionID, addr, conf);
+ try {
+ proxy.echo(21);
+ fail("The call must throw VersionMismatch exception");
+ } catch (RemoteException ex) {
+ Assert.assertEquals(RPC.VersionMismatch.class.getName(),
+ ex.getClassName());
+ Assert.assertTrue(ex.getErrorCode().equals(
+ RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH));
+ } catch (IOException ex) {
+ fail("Expected version mismatch but got " + ex);
+ }
+ }
+
+ @Test
+ public void testIsMethodSupported() throws IOException {
+ server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+ .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
+ TestProtocol2.versionID, addr, conf);
+ boolean supported = RpcClientUtil.isMethodSupported(proxy,
+ TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE,
+ RPC.getProtocolVersion(TestProtocol2.class), "echo");
+ Assert.assertTrue(supported);
+ supported = RpcClientUtil.isMethodSupported(proxy,
+ TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(TestProtocol2.class), "echo");
+ Assert.assertFalse(supported);
+ }
+
+ /**
+ * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
+ * the server registry to extract protocol signatures and versions.
+ */
+ @Test
+ public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
+ TestImpl1 impl = new TestImpl1();
+ server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+ .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+ .setVerbose(false).build();
+ server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+ server.start();
+
+ ProtocolMetaInfoServerSideTranslatorPB xlator =
+ new ProtocolMetaInfoServerSideTranslatorPB(server);
+
+ GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
+ null,
+ createGetProtocolSigRequestProto(TestProtocol1.class,
+ RPC.RpcKind.RPC_PROTOCOL_BUFFER));
+ //No signatures should be found
+ Assert.assertEquals(0, resp.getProtocolSignatureCount());
+ resp = xlator.getProtocolSignature(
+ null,
+ createGetProtocolSigRequestProto(TestProtocol1.class,
+ RPC.RpcKind.RPC_WRITABLE));
+ Assert.assertEquals(1, resp.getProtocolSignatureCount());
+ ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
+ Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
+ boolean found = false;
+ int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
+ .getMethod("echo", String.class));
+ for (int m : sig.getMethodsList()) {
+ if (expected == m) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue(found);
+ }
+
+ private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
+ Class<?> protocol, RPC.RpcKind rpcKind) {
+ GetProtocolSignatureRequestProto.Builder builder =
+ GetProtocolSignatureRequestProto.newBuilder();
+ builder.setProtocol(protocol.getName());
+ builder.setRpcKind(rpcKind.toString());
+ return builder.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
index b22f91b..5807998 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.ipc;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -28,13 +30,11 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
-
/**
* tests that the proxy can be interrupted
*/
-public class TestRPCWaitForProxy extends TestRpcBase {
+public class TestRPCWaitForProxy extends Assert {
+ private static final String ADDRESS = "0.0.0.0";
private static final Logger
LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
@@ -46,15 +46,14 @@ public class TestRPCWaitForProxy extends TestRpcBase {
*
* @throws Throwable any exception other than that which was expected
*/
- @Test(timeout = 50000)
+ @Test(timeout = 10000)
public void testWaitForProxy() throws Throwable {
RpcThread worker = new RpcThread(0);
worker.start();
worker.join();
Throwable caught = worker.getCaught();
- Throwable cause = caught.getCause();
- Assert.assertNotNull("No exception was raised", cause);
- if (!(cause instanceof ConnectException)) {
+ assertNotNull("No exception was raised", caught);
+ if (!(caught instanceof ConnectException)) {
throw caught;
}
}
@@ -70,11 +69,11 @@ public class TestRPCWaitForProxy extends TestRpcBase {
RpcThread worker = new RpcThread(100);
worker.start();
Thread.sleep(1000);
- Assert.assertTrue("worker hasn't started", worker.waitStarted);
+ assertTrue("worker hasn't started", worker.waitStarted);
worker.interrupt();
worker.join();
Throwable caught = worker.getCaught();
- Assert.assertNotNull("No exception was raised", caught);
+ assertNotNull("No exception was raised", caught);
// looking for the root cause here, which can be wrapped
// as part of the NetUtils work. Having this test look
// a the type of exception there would be brittle to improvements
@@ -83,8 +82,6 @@ public class TestRPCWaitForProxy extends TestRpcBase {
if (cause == null) {
// no inner cause, use outer exception as root cause.
cause = caught;
- } else if (cause.getCause() != null) {
- cause = cause.getCause();
}
if (!(cause instanceof InterruptedIOException)
&& !(cause instanceof ClosedByInterruptException)) {
@@ -115,16 +112,12 @@ public class TestRPCWaitForProxy extends TestRpcBase {
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
connectRetries);
waitStarted = true;
-
- short invalidPort = 20;
- InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS,
- invalidPort);
- TestRpcBase.TestRpcService proxy = RPC.getProxy(
- TestRpcBase.TestRpcService.class,
- 1L, invalidAddress, conf);
- // Test echo method
- proxy.echo(null, newEchoRequest("hello"));
-
+ TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
+ TestProtocol.versionID,
+ new InetSocketAddress(ADDRESS, 20),
+ config,
+ 15000L);
+ proxy.echo("");
} catch (Throwable throwable) {
caught = throwable;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 5a8f8d0..bc604a4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -112,8 +112,7 @@ public class TestRpcBase {
return setupTestServer(builder);
}
- protected static RPC.Server setupTestServer(
- RPC.Builder builder) throws IOException {
+ protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException {
RPC.Server server = builder.build();
server.start();
@@ -176,21 +175,17 @@ public class TestRpcBase {
public TestTokenIdentifier() {
this(new Text(), new Text());
}
-
public TestTokenIdentifier(Text tokenid) {
this(tokenid, new Text());
}
-
public TestTokenIdentifier(Text tokenid, Text realUser) {
this.tokenid = tokenid == null ? new Text() : tokenid;
this.realUser = realUser == null ? new Text() : realUser;
}
-
@Override
public Text getKind() {
return KIND_NAME;
}
-
@Override
public UserGroupInformation getUser() {
if (realUser.toString().isEmpty()) {
@@ -208,7 +203,6 @@ public class TestRpcBase {
tokenid.readFields(in);
realUser.readFields(in);
}
-
@Override
public void write(DataOutput out) throws IOException {
tokenid.write(out);
@@ -240,7 +234,7 @@ public class TestRpcBase {
@SuppressWarnings("unchecked")
@Override
public Token<TestTokenIdentifier> selectToken(Text service,
- Collection<Token<? extends TokenIdentifier>> tokens) {
+ Collection<Token<? extends TokenIdentifier>> tokens) {
if (service == null) {
return null;
}
@@ -394,17 +388,19 @@ public class TestRpcBase {
}
@Override
- public TestProtos.UserResponseProto getAuthUser(
+ public TestProtos.AuthUserResponseProto getAuthUser(
RpcController controller, TestProtos.EmptyRequestProto request)
throws ServiceException {
- UserGroupInformation authUser;
+ UserGroupInformation authUser = null;
try {
authUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new ServiceException(e);
}
- return newUserResponse(authUser.getUserName());
+ return TestProtos.AuthUserResponseProto.newBuilder()
+ .setAuthUser(authUser.getUserName())
+ .build();
}
@Override
@@ -436,34 +432,6 @@ public class TestRpcBase {
return TestProtos.EmptyResponseProto.newBuilder().build();
}
-
- @Override
- public TestProtos.UserResponseProto getCurrentUser(
- RpcController controller,
- TestProtos.EmptyRequestProto request) throws ServiceException {
- String user;
- try {
- user = UserGroupInformation.getCurrentUser().toString();
- } catch (IOException e) {
- throw new ServiceException("Failed to get current user", e);
- }
-
- return newUserResponse(user);
- }
-
- @Override
- public TestProtos.UserResponseProto getServerRemoteUser(
- RpcController controller,
- TestProtos.EmptyRequestProto request) throws ServiceException {
- String serverRemoteUser = Server.getRemoteUser().toString();
- return newUserResponse(serverRemoteUser);
- }
-
- private TestProtos.UserResponseProto newUserResponse(String user) {
- return TestProtos.UserResponseProto.newBuilder()
- .setUser(user)
- .build();
- }
}
protected static TestProtos.EmptyRequestProto newEmptyRequest() {
@@ -510,4 +478,8 @@ public class TestRpcBase {
}
return null;
}
+
+ protected static String convert(TestProtos.AuthUserResponseProto response) {
+ return response.getAuthUser();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
index c48ff2e..72371a7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
@@ -45,55 +45,30 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import javax.security.auth.callback.*;
+import javax.security.sasl.*;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.security.Security;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*;
+import static org.junit.Assert.*;
/** Unit tests for using Sasl over RPC. */
@RunWith(Parameterized.class)
public class TestSaslRPC extends TestRpcBase {
@Parameters
public static Collection<Object[]> data() {
- Collection<Object[]> params = new ArrayList<>();
+ Collection<Object[]> params = new ArrayList<Object[]>();
for (QualityOfProtection qop : QualityOfProtection.values()) {
params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null });
}
@@ -139,7 +114,7 @@ public class TestSaslRPC extends TestRpcBase {
NONE(),
VALID(),
INVALID(),
- OTHER()
+ OTHER();
}
@BeforeClass
@@ -255,7 +230,7 @@ public class TestSaslRPC extends TestRpcBase {
final Server server = setupTestServer(conf, 5, sm);
doDigestRpc(server, sm);
} finally {
- SecurityUtil.setSecurityInfoProviders();
+ SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
}
}
@@ -284,7 +259,7 @@ public class TestSaslRPC extends TestRpcBase {
addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()));
- Token<TestTokenIdentifier> token = new Token<>(tokenId, sm);
+ Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId, sm);
SecurityUtil.setTokenService(token, addr);
current.addToken(token);
@@ -321,8 +296,8 @@ public class TestSaslRPC extends TestRpcBase {
// set doPing to true
newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
- ConnectionId remoteId = ConnectionId.getConnectionId(
- new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf);
+ ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0),
+ TestRpcService.class, null, 0, null, newConf);
assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
remoteId.getPingInterval());
// set doPing to false
@@ -831,13 +806,13 @@ public class TestSaslRPC extends TestRpcBase {
final TestTokenSecretManager sm = new TestTokenSecretManager();
boolean useSecretManager = (serverAuth != SIMPLE);
if (enableSecretManager != null) {
- useSecretManager &= enableSecretManager;
+ useSecretManager &= enableSecretManager.booleanValue();
}
if (forceSecretManager != null) {
- useSecretManager |= forceSecretManager;
+ useSecretManager |= forceSecretManager.booleanValue();
}
final SecretManager<?> serverSm = useSecretManager ? sm : null;
-
+
Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
@Override
public Server run() throws IOException {
@@ -892,13 +867,13 @@ public class TestSaslRPC extends TestRpcBase {
proxy.ping(null, newEmptyRequest());
// make sure the other side thinks we are who we said we are!!!
assertEquals(clientUgi.getUserName(),
- proxy.getAuthUser(null, newEmptyRequest()).getUser());
+ convert(proxy.getAuthUser(null, newEmptyRequest())));
AuthMethod authMethod =
convert(proxy.getAuthMethod(null, newEmptyRequest()));
// verify sasl completed with correct QOP
assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
- RPC.getConnectionIdForProxy(proxy).getSaslQop());
- return authMethod != null ? authMethod.toString() : null;
+ RPC.getConnectionIdForProxy(proxy).getSaslQop());
+ return authMethod.toString();
} catch (ServiceException se) {
if (se.getCause() instanceof RemoteException) {
throw (RemoteException) se.getCause();
@@ -923,18 +898,21 @@ public class TestSaslRPC extends TestRpcBase {
String actual) {
assertEquals(expect.toString(), actual);
}
-
- private static void assertAuthEquals(Pattern expect, String actual) {
+
+ private static void assertAuthEquals(Pattern expect,
+ String actual) {
// this allows us to see the regexp and the value it didn't match
if (!expect.matcher(actual).matches()) {
- fail(); // it failed
+ assertEquals(expect, actual); // it failed
+ } else {
+ assertTrue(true); // it matched
}
}
/*
* Class used to test overriding QOP values using SaslPropertiesResolver
*/
- static class AuthSaslPropertiesResolver extends SaslPropertiesResolver {
+ static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{
@Override
public Map<String, String> getServerProperties(InetAddress address) {
@@ -943,7 +921,7 @@ public class TestSaslRPC extends TestRpcBase {
return newPropertes;
}
}
-
+
public static void main(String[] args) throws Exception {
System.out.println("Testing Kerberos authentication over RPC");
if (args.length != 2) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
index c4dbcac..50d389c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
@@ -17,35 +17,40 @@
*/
package org.apache.hadoop.security;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+import org.junit.Assert;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.ipc.TestRpcBase;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
-import org.junit.Assert;
+import org.apache.hadoop.security.token.TokenInfo;
import org.junit.Before;
import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Enumeration;
+import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager;
+import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
+import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector;
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
/**
- * Test do as effective user.
+ *
*/
-public class TestDoAsEffectiveUser extends TestRpcBase {
+public class TestDoAsEffectiveUser {
final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG";
final private static String REAL_USER_SHORT_NAME = "realUser1";
final private static String PROXY_USER_NAME = "proxyUser";
@@ -53,8 +58,8 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
final private static String GROUP2_NAME = "group2";
final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME,
GROUP2_NAME };
-
- private TestRpcService client;
+ private static final String ADDRESS = "0.0.0.0";
+ private TestProtocol proxy;
private static final Configuration masterConf = new Configuration();
@@ -77,7 +82,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
private void configureSuperUserIPAddresses(Configuration conf,
String superUserShortName) throws IOException {
- ArrayList<String> ipList = new ArrayList<>();
+ ArrayList<String> ipList = new ArrayList<String>();
Enumeration<NetworkInterface> netInterfaceList = NetworkInterface
.getNetworkInterfaces();
while (netInterfaceList.hasMoreElements()) {
@@ -125,19 +130,50 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
curUGI.toString());
}
- private void checkRemoteUgi(final UserGroupInformation ugi,
- final Configuration conf) throws Exception {
+ @TokenInfo(TestTokenSelector.class)
+ public interface TestProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
+ String aMethod() throws IOException;
+ String getServerRemoteUser() throws IOException;
+ }
+
+ public class TestImpl implements TestProtocol {
+
+ @Override
+ public String aMethod() throws IOException {
+ return UserGroupInformation.getCurrentUser().toString();
+ }
+
+ @Override
+ public String getServerRemoteUser() throws IOException {
+ return Server.getRemoteUser().toString();
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return TestProtocol.versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return new ProtocolSignature(TestProtocol.versionID, null);
+ }
+ }
+
+ private void checkRemoteUgi(final Server server,
+ final UserGroupInformation ugi, final Configuration conf)
+ throws Exception {
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
- public Void run() throws ServiceException {
- client = getClient(addr, conf);
- String currentUser = client.getCurrentUser(null,
- newEmptyRequest()).getUser();
- String serverRemoteUser = client.getServerRemoteUser(null,
- newEmptyRequest()).getUser();
-
- Assert.assertEquals(ugi.toString(), currentUser);
- Assert.assertEquals(ugi.toString(), serverRemoteUser);
+ public Void run() throws IOException {
+ proxy = RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID,
+ NetUtils.getConnectAddress(server), conf);
+ Assert.assertEquals(ugi.toString(), proxy.aMethod());
+ Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser());
return null;
}
});
@@ -149,27 +185,29 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
- // Set RPC engine to protobuf RPC engine
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
- UserGroupInformation.setConfiguration(conf);
- final Server server = setupTestServer(conf, 5);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(5).setVerbose(true).build();
refreshConf(conf);
try {
+ server.start();
+
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
- checkRemoteUgi(realUserUgi, conf);
+ checkRemoteUgi(server, realUserUgi, conf);
- UserGroupInformation proxyUserUgi =
- UserGroupInformation.createProxyUserForTesting(
+ UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
- checkRemoteUgi(proxyUserUgi, conf);
+ checkRemoteUgi(server, proxyUserUgi, conf);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
@@ -180,25 +218,29 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
- UserGroupInformation.setConfiguration(conf);
- final Server server = setupTestServer(conf, 5);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
refreshConf(conf);
try {
+ server.start();
+
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
- checkRemoteUgi(realUserUgi, conf);
+ checkRemoteUgi(server, realUserUgi, conf);
UserGroupInformation proxyUserUgi = UserGroupInformation
.createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
- checkRemoteUgi(proxyUserUgi, conf);
+ checkRemoteUgi(server, proxyUserUgi, conf);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
@@ -214,14 +256,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group1");
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
- UserGroupInformation.setConfiguration(conf);
- final Server server = setupTestServer(conf, 5);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
refreshConf(conf);
try {
+ server.start();
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@@ -230,10 +275,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
- public String run() throws ServiceException {
- client = getClient(addr, conf);
- return client.getCurrentUser(null,
- newEmptyRequest()).getUser();
+ public String run() throws IOException {
+ proxy = RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
+ String ret = proxy.aMethod();
+ return ret;
}
});
@@ -241,7 +287,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
} catch (Exception e) {
e.printStackTrace();
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
@@ -250,14 +299,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
final Configuration conf = new Configuration();
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
- UserGroupInformation.setConfiguration(conf);
- final Server server = setupTestServer(conf, 2);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
refreshConf(conf);
try {
+ server.start();
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@@ -266,10 +318,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
- public String run() throws ServiceException {
- client = getClient(addr, conf);
- return client.getCurrentUser(null,
- newEmptyRequest()).getUser();
+ public String run() throws IOException {
+ proxy = RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
+ String ret = proxy.aMethod();
+ return ret;
}
});
@@ -277,7 +330,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
} catch (Exception e) {
e.printStackTrace();
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
@@ -285,12 +341,15 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
public void testRealUserGroupNotSpecified() throws IOException {
final Configuration conf = new Configuration();
configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
- UserGroupInformation.setConfiguration(conf);
- final Server server = setupTestServer(conf, 2);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
try {
+ server.start();
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@@ -299,10 +358,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
- public String run() throws ServiceException {
- client = getClient(addr, conf);
- return client.getCurrentUser(null,
- newEmptyRequest()).getUser();
+ public String run() throws IOException {
+ proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
+ String ret = proxy.aMethod();
+ return ret;
}
});
@@ -310,7 +370,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
} catch (Exception e) {
e.printStackTrace();
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
@@ -321,14 +384,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
conf.setStrings(DefaultImpersonationProvider.getTestProvider().
getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
"group3");
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
- UserGroupInformation.setConfiguration(conf);
- final Server server = setupTestServer(conf, 2);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(2).setVerbose(false).build();
refreshConf(conf);
try {
+ server.start();
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
@@ -337,10 +403,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
- public String run() throws ServiceException {
- client = getClient(addr, conf);
- return client.getCurrentUser(null,
- newEmptyRequest()).getUser();
+ public String run() throws IOException {
+ proxy = RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
+ String ret = proxy.aMethod();
+ return ret;
}
});
@@ -348,7 +415,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
} catch (Exception e) {
e.printStackTrace();
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
@@ -362,17 +432,20 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
final Configuration conf = new Configuration(masterConf);
TestTokenSecretManager sm = new TestTokenSecretManager();
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(conf);
- final Server server = setupTestServer(conf, 5, sm);
+ final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+ .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
+
+ server.start();
final UserGroupInformation current = UserGroupInformation
.createRemoteUser(REAL_USER_NAME);
-
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()), new Text("SomeSuperUser"));
- Token<TestTokenIdentifier> token = new Token<>(tokenId,
+ Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
sm);
SecurityUtil.setTokenService(token, addr);
UserGroupInformation proxyUserUgi = UserGroupInformation
@@ -380,19 +453,23 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
proxyUserUgi.addToken(token);
refreshConf(conf);
-
+
String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try {
- client = getClient(addr, conf);
- return client.getCurrentUser(null,
- newEmptyRequest()).getUser();
+ proxy = RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
+ String ret = proxy.aMethod();
+ return ret;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
});
@@ -409,34 +486,42 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
TestTokenSecretManager sm = new TestTokenSecretManager();
final Configuration newConf = new Configuration(masterConf);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf);
- // Set RPC engine to protobuf RPC engine
- RPC.setProtocolEngine(newConf, TestRpcService.class,
- ProtobufRpcEngine.class);
UserGroupInformation.setConfiguration(newConf);
- final Server server = setupTestServer(newConf, 5, sm);
+ final Server server = new RPC.Builder(newConf)
+ .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+ .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+ .setSecretManager(sm).build();
+
+ server.start();
final UserGroupInformation current = UserGroupInformation
.createUserForTesting(REAL_USER_NAME, GROUP_NAMES);
refreshConf(newConf);
-
+
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()), new Text("SomeSuperUser"));
- Token<TestTokenIdentifier> token = new Token<>(tokenId, sm);
+ Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
+ sm);
SecurityUtil.setTokenService(token, addr);
current.addToken(token);
String retVal = current.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try {
- client = getClient(addr, newConf);
- return client.getCurrentUser(null,
- newEmptyRequest()).getUser();
+ proxy = RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, newConf);
+ String ret = proxy.aMethod();
+ return ret;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
- stop(server, client);
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
}
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
index e45d70d..b3ea5f2 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -29,11 +28,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
@@ -55,22 +50,9 @@ import java.util.Set;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
-import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
-import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
-import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.hadoop.ipc.TestSaslRPC.*;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -127,7 +109,7 @@ public class TestUserGroupInformation {
UserGroupInformation.setLoginUser(null);
}
- @Test(timeout = 30000)
+ @Test (timeout = 30000)
public void testSimpleLogin() throws IOException {
tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/proto/test.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto
index 6411f97..99cd93d 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto
@@ -88,6 +88,6 @@ message AuthMethodResponseProto {
required string mechanismName = 2;
}
-message UserResponseProto {
- required string user = 1;
+message AuthUserResponseProto {
+ required string authUser = 1;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
index 06f6c4f..3292115 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
@@ -40,11 +40,9 @@ service TestProtobufRpcProto {
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
- rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
+ rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto);
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto);
- rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto);
- rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto);
}
service TestProtobufRpc2Proto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57f7cb1..6b52949 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -168,6 +168,7 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.Node;
@@ -316,6 +317,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
+
+ WritableRpcEngine.ensureInitialized();
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
new file mode 100644
index 0000000..0b7ee33
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.mockito.Mockito.mock;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Test;
+
+/** Unit tests for using Delegation Token over RPC. */
+public class TestClientProtocolWithDelegationToken {
+ private static final String ADDRESS = "0.0.0.0";
+
+ public static final Log LOG = LogFactory
+ .getLog(TestClientProtocolWithDelegationToken.class);
+
+ private static final Configuration conf;
+ static {
+ conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ static {
+ GenericTestUtils.setLogLevel(Client.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(Server.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL);
+ }
+
+ @Test
+ public void testDelegationTokenRpc() throws Exception {
+ ClientProtocol mockNN = mock(ClientProtocol.class);
+ FSNamesystem mockNameSys = mock(FSNamesystem.class);
+
+ DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+ 3600000, mockNameSys);
+ sm.startThreads();
+ final Server server = new RPC.Builder(conf)
+ .setProtocol(ClientProtocol.class).setInstance(mockNN)
+ .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+ .setSecretManager(sm).build();
+
+ server.start();
+
+ final UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ String user = current.getUserName();
+ Text owner = new Text(user);
+ DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null);
+ Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
+ dtId, sm);
+ SecurityUtil.setTokenService(token, addr);
+ LOG.info("Service for token is " + token.getService());
+ current.addToken(token);
+ current.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ ClientProtocol proxy = null;
+ try {
+ proxy = RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, addr, conf);
+ proxy.getServerDefaults();
+ } finally {
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
index 729af0a..3fef5e2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
@@ -97,6 +98,8 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService
.newReflectiveBlockingService(refreshHSAdminProtocolXlator);
+ WritableRpcEngine.ensureInitialized();
+
clientRpcAddress = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.JHS_ADMIN_ADDRESS,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org