You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/09/13 08:16:29 UTC

[05/32] hadoop git commit: Revert "HADOOP-13218. Migrate other Hadoop side tests to prepare for removing WritableRPCEngine. Contributed by Wei Zhou and Kai Zheng"

Revert "HADOOP-13218. Migrate other Hadoop side tests to prepare for removing WritableRPCEngine. Contributed by Wei Zhou and Kai Zheng"

This reverts commit 62a9667136ebd8a048f556b534fcff4fdaf8e2ec


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d355573f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d355573f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d355573f

Branch: refs/heads/HADOOP-12756
Commit: d355573f5681f43e760a1bc23ebed553bd35fca5
Parents: f414d5e
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Sep 8 05:50:17 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Thu Sep 8 05:50:17 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |   5 +-
 .../main/java/org/apache/hadoop/ipc/RPC.java    |  15 +-
 .../main/java/org/apache/hadoop/ipc/Server.java |   4 +-
 .../hadoop/security/UserGroupInformation.java   |   4 +-
 .../org/apache/hadoop/ipc/RPCCallBenchmark.java |  38 ++-
 .../hadoop/ipc/TestMultipleProtocolServer.java  | 236 ++++++++++++++-
 .../apache/hadoop/ipc/TestRPCCallBenchmark.java |  13 +
 .../apache/hadoop/ipc/TestRPCCompatibility.java | 242 +++++++++++++--
 .../apache/hadoop/ipc/TestRPCWaitForProxy.java  |  37 +--
 .../java/org/apache/hadoop/ipc/TestRpcBase.java |  50 +---
 .../java/org/apache/hadoop/ipc/TestSaslRPC.java |  74 ++---
 .../hadoop/security/TestDoAsEffectiveUser.java  | 291 ++++++++++++-------
 .../security/TestUserGroupInformation.java      |  28 +-
 .../hadoop-common/src/test/proto/test.proto     |   4 +-
 .../src/test/proto/test_rpc_service.proto       |   4 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   3 +
 .../TestClientProtocolWithDelegationToken.java  | 119 ++++++++
 .../mapreduce/v2/hs/server/HSAdminServer.java   |   3 +
 18 files changed, 876 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index e68bfd4..83e4b9e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -60,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine {
   private static final ThreadLocal<AsyncGet<Message, Exception>>
       ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
 
-  static { // Register the rpcRequest deserializer for ProtobufRpcEngine
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
         RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
         new Server.ProtoBufRpcInvoker());
@@ -194,8 +194,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (args.length != 2) { // RpcController + Message
-        throw new ServiceException(
-            "Too many or few parameters for request. Method: ["
+        throw new ServiceException("Too many parameters for request. Method: ["
             + method.getName() + "]" + ", Expected: 2, Actual: "
             + args.length);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 12a07a5..3f68d63 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
@@ -28,6 +26,7 @@ import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
+import java.io.*;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,12 +37,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.*;
+
 import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
@@ -56,6 +54,7 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
@@ -88,7 +87,7 @@ public class RPC {
     RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
     RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
     final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
-    private final short value;
+    public final short value; //TODO make it private
 
     RpcKind(short val) {
       this.value = val;
@@ -208,7 +207,7 @@ public class RPC {
     RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
     if (engine == null) {
       Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
-                                    ProtobufRpcEngine.class);
+                                    WritableRpcEngine.class);
       engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
       PROTOCOL_ENGINES.put(protocol, engine);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 531d574..f20ba94 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -237,14 +237,14 @@ public abstract class Server {
   static class RpcKindMapValue {
     final Class<? extends Writable> rpcRequestWrapperClass;
     final RpcInvoker rpcInvoker;
-
     RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
           RpcInvoker rpcInvoker) {
       this.rpcInvoker = rpcInvoker;
       this.rpcRequestWrapperClass = rpcRequestWrapperClass;
     }   
   }
-  static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new HashMap<>(4);
+  static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RPC.RpcKind, RpcKindMapValue>(4);
   
   
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
index ed3a9d0..0ad9abc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -730,7 +730,7 @@ public class UserGroupInformation {
    * 
    * @param user                The principal name to load from the ticket
    *                            cache
-   * @param ticketCache     the path to the ticket cache file
+   * @param ticketCachePath     the path to the ticket cache file
    *
    * @throws IOException        if the kerberos login fails
    */
@@ -790,7 +790,7 @@ public class UserGroupInformation {
   /**
    * Create a UserGroupInformation from a Subject with Kerberos principal.
    *
-   * @param subject             The KerberosPrincipal to use in UGI
+   * @param user                The KerberosPrincipal to use in UGI
    *
    * @throws IOException        if the kerberos login fails
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
index 9356dab..eb7b949 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java
@@ -17,8 +17,13 @@
  */
 package org.apache.hadoop.ipc;
 
-import com.google.common.base.Joiner;
-import com.google.protobuf.BlockingService;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -29,6 +34,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
@@ -39,12 +45,8 @@ import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Joiner;
+import com.google.protobuf.BlockingService;
 
 /**
  * Benchmark for protobuf RPC.
@@ -66,7 +68,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
     public int secondsToRun = 15;
     private int msgSize = 1024;
     public Class<? extends RpcEngine> rpcEngine =
-        ProtobufRpcEngine.class;
+      WritableRpcEngine.class;
     
     private MyOptions(String args[]) {
       try {
@@ -133,7 +135,7 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
       
       opts.addOption(
           OptionBuilder.withLongOpt("engine").hasArg(true)
-          .withArgName("protobuf")
+          .withArgName("writable|protobuf")
           .withDescription("engine to use")
           .create('e'));
       
@@ -182,6 +184,8 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
         String eng = line.getOptionValue('e');
         if ("protobuf".equals(eng)) {
           rpcEngine = ProtobufRpcEngine.class;
+        } else if ("writable".equals(eng)) {
+          rpcEngine = WritableRpcEngine.class;
         } else {
           throw new ParseException("invalid engine: " + eng);
         }
@@ -233,6 +237,11 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
       server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
           .setInstance(service).setBindAddress(opts.host).setPort(opts.getPort())
           .setNumHandlers(opts.serverThreads).setVerbose(false).build();
+    } else if (opts.rpcEngine == WritableRpcEngine.class) {
+      server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+          .setInstance(new TestRPC.TestImpl()).setBindAddress(opts.host)
+          .setPort(opts.getPort()).setNumHandlers(opts.serverThreads)
+          .setVerbose(false).build();
     } else {
       throw new RuntimeException("Bad engine: " + opts.rpcEngine);
     }
@@ -390,6 +399,15 @@ public class RPCCallBenchmark extends TestRpcBase implements Tool {
           return responseProto.getMessage();
         }
       };
+    } else if (opts.rpcEngine == WritableRpcEngine.class) {
+      final TestProtocol proxy = RPC.getProxy(
+          TestProtocol.class, TestProtocol.versionID, addr, conf);
+      return new RpcServiceWrapper() {
+        @Override
+        public String doEcho(String msg) throws Exception {
+          return proxy.echo(msg);
+        }
+      };
     } else {
       throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
index 10e23ba..8b419e3 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
@@ -17,28 +17,252 @@
  */
 package org.apache.hadoop.ipc;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
 import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.net.NetUtils;
 import org.junit.Before;
+import org.junit.After;
 import org.junit.Test;
+import com.google.protobuf.BlockingService;
 
 public class TestMultipleProtocolServer extends TestRpcBase {
-
+  private static InetSocketAddress addr;
   private static RPC.Server server;
 
-  @Before
-  public void setUp() throws Exception {
-    super.setupConf();
+  private static Configuration conf = new Configuration();
+  
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface Foo0 extends VersionedProtocol {
+    public static final long versionID = 0L;
+    String ping() throws IOException;
+    
+  }
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface Foo1 extends VersionedProtocol {
+    public static final long versionID = 1L;
+    String ping() throws IOException;
+    String ping2() throws IOException;
+  }
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface FooUnimplemented extends VersionedProtocol {
+    public static final long versionID = 2L;
+    String ping() throws IOException;  
+  }
+  
+  interface Mixin extends VersionedProtocol{
+    public static final long versionID = 0L;
+    void hello() throws IOException;
+  }
+
+  interface Bar extends Mixin {
+    public static final long versionID = 0L;
+    int echo(int i) throws IOException;
+  }
+  
+  class Foo0Impl implements Foo0 {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Foo0.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                          getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
 
-    server = setupTestServer(conf, 2);
+    @Override
+    public String ping() {
+      return "Foo0";     
+    }
+    
   }
+  
+  class Foo1Impl implements Foo1 {
 
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Foo1.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                        getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public String ping() {
+      return "Foo1";
+    }
+
+    @Override
+    public String ping2() {
+      return "Foo1";
+      
+    }
+    
+  }
+
+  
+  class BarImpl implements Bar {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Bar.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                          getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public int echo(int i) {
+      return i;
+    }
+
+    @Override
+    public void hello() {
+
+      
+    }
+  }
+  @Before
+  public void setUp() throws Exception {
+    // create a server with two handlers
+    server = new RPC.Builder(conf).setProtocol(Foo0.class)
+        .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
+    
+    
+    // Add Protobuf server
+    // Create server side implementation
+    PBServerImpl pbServerImpl = new PBServerImpl();
+    BlockingService service = TestProtobufRpcProto
+        .newReflectiveBlockingService(pbServerImpl);
+    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
+        service);
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+  }
+  
   @After
   public void tearDown() throws Exception {
     server.stop();
   }
 
+  @Test
+  public void test1() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
 
+    Foo0 foo0 = (Foo0)proxy.getProxy(); 
+    Assert.assertEquals("Foo0", foo0.ping());
+    
+    
+    proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
+    
+    
+    Foo1 foo1 = (Foo1)proxy.getProxy(); 
+    Assert.assertEquals("Foo1", foo1.ping());
+    Assert.assertEquals("Foo1", foo1.ping());
+    
+    
+    proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
+    
+    
+    Bar bar = (Bar)proxy.getProxy(); 
+    Assert.assertEquals(99, bar.echo(99));
+    
+    // Now test Mixin class method
+    
+    Mixin mixin = bar;
+    mixin.hello();
+  }
+  
+  
+  // Server does not implement the FooUnimplemented version of protocol Foo.
+  // See that calls to it fail.
+  @Test(expected=IOException.class)
+  public void testNonExistingProtocol() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(FooUnimplemented.class, 
+        FooUnimplemented.versionID, addr, conf);
+
+    FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 
+    foo.ping();
+  }
+
+  /**
+   * getProtocolVersion of an unimplemented version should return highest version
+   * Similarly getProtocolSignature should work.
+   * @throws IOException
+   */
+  @Test
+  public void testNonExistingProtocol2() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(FooUnimplemented.class, 
+        FooUnimplemented.versionID, addr, conf);
+
+    FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 
+    Assert.assertEquals(Foo1.versionID, 
+        foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), 
+        FooUnimplemented.versionID));
+    foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), 
+        FooUnimplemented.versionID, 0);
+  }
+  
+  @Test(expected=IOException.class)
+  public void testIncorrectServerCreation() throws IOException {
+    new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
+        .build();
+  } 
+  
   // Now test a PB service - a server  hosts both PB and Writable Rpcs.
   @Test
   public void testPBService() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
index 6d83d7d..969f728 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java
@@ -26,6 +26,19 @@ import org.junit.Test;
 public class TestRPCCallBenchmark {
 
   @Test(timeout=20000)
+  public void testBenchmarkWithWritable() throws Exception {
+    int rc = ToolRunner.run(new RPCCallBenchmark(),
+        new String[] {
+      "--clientThreads", "30",
+      "--serverThreads", "30",
+      "--time", "5",
+      "--serverReaderThreads", "4",
+      "--messageSize", "1024",
+      "--engine", "writable"});
+    assertEquals(0, rc);
+  }
+  
+  @Test(timeout=20000)
   public void testBenchmarkWithProto() throws Exception {
     int rc = ToolRunner.run(new RPCCallBenchmark(),
         new String[] {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
index a06d9fd..2ac2be9 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
@@ -18,20 +18,28 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
+import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.net.NetUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
 /** Unit test for supporting method-name based compatible RPCs. */
 public class TestRPCCompatibility {
   private static final String ADDRESS = "0.0.0.0";
@@ -41,7 +49,7 @@ public class TestRPCCompatibility {
 
   public static final Log LOG =
     LogFactory.getLog(TestRPCCompatibility.class);
-
+  
   private static Configuration conf = new Configuration();
 
   public interface TestProtocol0 extends VersionedProtocol {
@@ -112,21 +120,6 @@ public class TestRPCCompatibility {
   @Before
   public void setUp() {
     ProtocolSignature.resetCache();
-
-    RPC.setProtocolEngine(conf,
-        TestProtocol0.class, ProtobufRpcEngine.class);
-
-    RPC.setProtocolEngine(conf,
-        TestProtocol1.class, ProtobufRpcEngine.class);
-
-    RPC.setProtocolEngine(conf,
-        TestProtocol2.class, ProtobufRpcEngine.class);
-
-    RPC.setProtocolEngine(conf,
-        TestProtocol3.class, ProtobufRpcEngine.class);
-
-    RPC.setProtocolEngine(conf,
-        TestProtocol4.class, ProtobufRpcEngine.class);
   }
   
   @After
@@ -140,7 +133,117 @@ public class TestRPCCompatibility {
       server = null;
     }
   }
+  
+  @Test  // old client vs new server
+  public void testVersion0ClientVersion1Server() throws Exception {
+    // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
+    server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+        .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+        .setVerbose(false).build();
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+    proxy = RPC.getProtocolProxy(
+        TestProtocol0.class, TestProtocol0.versionID, addr, conf);
+
+    TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy();
+    proxy0.ping();
+  }
+  
+  @Test  // old client vs new server
+  public void testVersion1ClientVersion0Server() throws Exception {
+    // create a server with two handlers
+    server = new RPC.Builder(conf).setProtocol(TestProtocol0.class)
+        .setInstance(new TestImpl0()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+    proxy = RPC.getProtocolProxy(
+        TestProtocol1.class, TestProtocol1.versionID, addr, conf);
+
+    TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy();
+    proxy1.ping();
+    try {
+      proxy1.echo("hello");
+      fail("Echo should fail");
+    } catch(IOException e) {
+    }
+  }
+  
+  private class Version2Client {
 
+    private TestProtocol2 proxy2;
+    private ProtocolProxy<TestProtocol2> serverInfo;
+    
+    private Version2Client() throws IOException {
+      serverInfo =  RPC.getProtocolProxy(
+          TestProtocol2.class, TestProtocol2.versionID, addr, conf);
+      proxy2 = serverInfo.getProxy();
+    }
+    
+    public int echo(int value) throws IOException, NumberFormatException {
+      if (serverInfo.isMethodSupported("echo", int.class)) {
+System.out.println("echo int is supported");
+        return -value;  // use version 3 echo long
+      } else { // server is version 2
+System.out.println("echo int is NOT supported");
+        return Integer.parseInt(proxy2.echo(String.valueOf(value)));
+      }
+    }
+
+    public String echo(String value) throws IOException {
+      return proxy2.echo(value);
+    }
+
+    public void ping() throws IOException {
+      proxy2.ping();
+    }
+  }
+
+  @Test // Compatible new client & old server
+  public void testVersion2ClientVersion1Server() throws Exception {
+    // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
+    server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+        .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+        .setVerbose(false).build();
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+
+    Version2Client client = new Version2Client();
+    client.ping();
+    assertEquals("hello", client.echo("hello"));
+    
+    // echo(int) is not supported by server, so returning 3
+    // This verifies that echo(int) and echo(String)'s hash codes are different
+    assertEquals(3, client.echo(3));
+  }
+  
+  @Test // equal version client and server
+  public void testVersion2ClientVersion2Server() throws Exception {
+    // create a server with two handlers
+    TestImpl2 impl = new TestImpl2();
+    server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+        .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+        .setVerbose(false).build();
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+    Version2Client client = new Version2Client();
+
+    client.ping();
+    assertEquals("hello", client.echo("hello"));
+    
+    // now that echo(int) is supported by the server, echo(int) should return -3
+    assertEquals(-3, client.echo(3));
+  }
+  
   public interface TestProtocol3 {
     int echo(String value);
     int echo(int value);
@@ -194,4 +297,97 @@ public class TestRPCCompatibility {
     @Override
     int echo(int value)  throws IOException;
   }
+  
+  @Test
+  public void testVersionMismatch() throws IOException {
+    server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+        .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+    TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
+        TestProtocol4.versionID, addr, conf);
+    try {
+      proxy.echo(21);
+      fail("The call must throw VersionMismatch exception");
+    } catch (RemoteException ex) {
+      Assert.assertEquals(RPC.VersionMismatch.class.getName(), 
+          ex.getClassName());
+      Assert.assertTrue(ex.getErrorCode().equals(
+          RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH));
+    }  catch (IOException ex) {
+      fail("Expected version mismatch but got " + ex);
+    }
+  }
+  
+  @Test
+  public void testIsMethodSupported() throws IOException {
+    server = new RPC.Builder(conf).setProtocol(TestProtocol2.class)
+        .setInstance(new TestImpl2()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+
+    TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
+        TestProtocol2.versionID, addr, conf);
+    boolean supported = RpcClientUtil.isMethodSupported(proxy,
+        TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE,
+        RPC.getProtocolVersion(TestProtocol2.class), "echo");
+    Assert.assertTrue(supported);
+    supported = RpcClientUtil.isMethodSupported(proxy,
+        TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(TestProtocol2.class), "echo");
+    Assert.assertFalse(supported);
+  }
+
+  /**
+   * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
+   * the server registry to extract protocol signatures and versions.
+   */
+  @Test
+  public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
+    TestImpl1 impl = new TestImpl1();
+    server = new RPC.Builder(conf).setProtocol(TestProtocol1.class)
+        .setInstance(impl).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2)
+        .setVerbose(false).build();
+    server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
+    server.start();
+
+    ProtocolMetaInfoServerSideTranslatorPB xlator = 
+        new ProtocolMetaInfoServerSideTranslatorPB(server);
+
+    GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
+        null,
+        createGetProtocolSigRequestProto(TestProtocol1.class,
+            RPC.RpcKind.RPC_PROTOCOL_BUFFER));
+    //No signatures should be found
+    Assert.assertEquals(0, resp.getProtocolSignatureCount());
+    resp = xlator.getProtocolSignature(
+        null,
+        createGetProtocolSigRequestProto(TestProtocol1.class,
+            RPC.RpcKind.RPC_WRITABLE));
+    Assert.assertEquals(1, resp.getProtocolSignatureCount());
+    ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
+    Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
+    boolean found = false;
+    int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
+        .getMethod("echo", String.class));
+    for (int m : sig.getMethodsList()) {
+      if (expected == m) {
+        found = true;
+        break;
+      }
+    }
+    Assert.assertTrue(found);
+  }
+  
+  private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
+      Class<?> protocol, RPC.RpcKind rpcKind) {
+    GetProtocolSignatureRequestProto.Builder builder = 
+        GetProtocolSignatureRequestProto.newBuilder();
+    builder.setProtocol(protocol.getName());
+    builder.setRpcKind(rpcKind.toString());
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
index b22f91b..5807998 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.ipc;
 
 import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -28,13 +30,11 @@ import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedByInterruptException;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
-
 /**
  * tests that the proxy can be interrupted
  */
-public class TestRPCWaitForProxy extends TestRpcBase {
+public class TestRPCWaitForProxy extends Assert {
+  private static final String ADDRESS = "0.0.0.0";
   private static final Logger
       LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
 
@@ -46,15 +46,14 @@ public class TestRPCWaitForProxy extends TestRpcBase {
    *
    * @throws Throwable any exception other than that which was expected
    */
-  @Test(timeout = 50000)
+  @Test(timeout = 10000)
   public void testWaitForProxy() throws Throwable {
     RpcThread worker = new RpcThread(0);
     worker.start();
     worker.join();
     Throwable caught = worker.getCaught();
-    Throwable cause = caught.getCause();
-    Assert.assertNotNull("No exception was raised", cause);
-    if (!(cause instanceof ConnectException)) {
+    assertNotNull("No exception was raised", caught);
+    if (!(caught instanceof ConnectException)) {
       throw caught;
     }
   }
@@ -70,11 +69,11 @@ public class TestRPCWaitForProxy extends TestRpcBase {
     RpcThread worker = new RpcThread(100);
     worker.start();
     Thread.sleep(1000);
-    Assert.assertTrue("worker hasn't started", worker.waitStarted);
+    assertTrue("worker hasn't started", worker.waitStarted);
     worker.interrupt();
     worker.join();
     Throwable caught = worker.getCaught();
-    Assert.assertNotNull("No exception was raised", caught);
+    assertNotNull("No exception was raised", caught);
     // looking for the root cause here, which can be wrapped
     // as part of the NetUtils work. Having this test look
     // a the type of exception there would be brittle to improvements
@@ -83,8 +82,6 @@ public class TestRPCWaitForProxy extends TestRpcBase {
     if (cause == null) {
       // no inner cause, use outer exception as root cause.
       cause = caught;
-    } else if (cause.getCause() != null) {
-      cause = cause.getCause();
     }
     if (!(cause instanceof InterruptedIOException)
         && !(cause instanceof ClosedByInterruptException)) {
@@ -115,16 +112,12 @@ public class TestRPCWaitForProxy extends TestRpcBase {
             IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
             connectRetries);
         waitStarted = true;
-
-        short invalidPort = 20;
-        InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS,
-            invalidPort);
-        TestRpcBase.TestRpcService proxy = RPC.getProxy(
-            TestRpcBase.TestRpcService.class,
-            1L, invalidAddress, conf);
-        // Test echo method
-        proxy.echo(null, newEchoRequest("hello"));
-
+        TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
+            TestProtocol.versionID,
+            new InetSocketAddress(ADDRESS, 20),
+            config,
+            15000L);
+        proxy.echo("");
       } catch (Throwable throwable) {
         caught = throwable;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 5a8f8d0..bc604a4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -112,8 +112,7 @@ public class TestRpcBase {
     return setupTestServer(builder);
   }
 
-  protected static RPC.Server setupTestServer(
-      RPC.Builder builder) throws IOException {
+  protected static RPC.Server setupTestServer(RPC.Builder builder) throws IOException {
     RPC.Server server = builder.build();
 
     server.start();
@@ -176,21 +175,17 @@ public class TestRpcBase {
     public TestTokenIdentifier() {
       this(new Text(), new Text());
     }
-
     public TestTokenIdentifier(Text tokenid) {
       this(tokenid, new Text());
     }
-
     public TestTokenIdentifier(Text tokenid, Text realUser) {
       this.tokenid = tokenid == null ? new Text() : tokenid;
       this.realUser = realUser == null ? new Text() : realUser;
     }
-
     @Override
     public Text getKind() {
       return KIND_NAME;
     }
-
     @Override
     public UserGroupInformation getUser() {
       if (realUser.toString().isEmpty()) {
@@ -208,7 +203,6 @@ public class TestRpcBase {
       tokenid.readFields(in);
       realUser.readFields(in);
     }
-
     @Override
     public void write(DataOutput out) throws IOException {
       tokenid.write(out);
@@ -240,7 +234,7 @@ public class TestRpcBase {
     @SuppressWarnings("unchecked")
     @Override
     public Token<TestTokenIdentifier> selectToken(Text service,
-                      Collection<Token<? extends TokenIdentifier>> tokens) {
+                                                  Collection<Token<? extends TokenIdentifier>> tokens) {
       if (service == null) {
         return null;
       }
@@ -394,17 +388,19 @@ public class TestRpcBase {
     }
 
     @Override
-    public TestProtos.UserResponseProto getAuthUser(
+    public TestProtos.AuthUserResponseProto getAuthUser(
         RpcController controller, TestProtos.EmptyRequestProto request)
         throws ServiceException {
-      UserGroupInformation authUser;
+      UserGroupInformation authUser = null;
       try {
         authUser = UserGroupInformation.getCurrentUser();
       } catch (IOException e) {
         throw new ServiceException(e);
       }
 
-      return newUserResponse(authUser.getUserName());
+      return TestProtos.AuthUserResponseProto.newBuilder()
+          .setAuthUser(authUser.getUserName())
+          .build();
     }
 
     @Override
@@ -436,34 +432,6 @@ public class TestRpcBase {
 
       return TestProtos.EmptyResponseProto.newBuilder().build();
     }
-
-    @Override
-    public TestProtos.UserResponseProto getCurrentUser(
-        RpcController controller,
-        TestProtos.EmptyRequestProto request) throws ServiceException {
-      String user;
-      try {
-        user = UserGroupInformation.getCurrentUser().toString();
-      } catch (IOException e) {
-        throw new ServiceException("Failed to get current user", e);
-      }
-
-      return newUserResponse(user);
-    }
-
-    @Override
-    public TestProtos.UserResponseProto getServerRemoteUser(
-        RpcController controller,
-        TestProtos.EmptyRequestProto request) throws ServiceException {
-      String serverRemoteUser = Server.getRemoteUser().toString();
-      return newUserResponse(serverRemoteUser);
-    }
-
-    private TestProtos.UserResponseProto newUserResponse(String user) {
-      return TestProtos.UserResponseProto.newBuilder()
-          .setUser(user)
-          .build();
-    }
   }
 
   protected static TestProtos.EmptyRequestProto newEmptyRequest() {
@@ -510,4 +478,8 @@ public class TestRpcBase {
     }
     return null;
   }
+
+  protected static String convert(TestProtos.AuthUserResponseProto response) {
+    return response.getAuthUser();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
index c48ff2e..72371a7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
@@ -45,55 +45,30 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import javax.security.auth.callback.*;
+import javax.security.sasl.*;
 import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.security.Security;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.KERBEROS;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.SIMPLE;
-import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.hadoop.security.SaslRpcServer.AuthMethod.*;
+import static org.junit.Assert.*;
 
 /** Unit tests for using Sasl over RPC. */
 @RunWith(Parameterized.class)
 public class TestSaslRPC extends TestRpcBase {
   @Parameters
   public static Collection<Object[]> data() {
-    Collection<Object[]> params = new ArrayList<>();
+    Collection<Object[]> params = new ArrayList<Object[]>();
     for (QualityOfProtection qop : QualityOfProtection.values()) {
       params.add(new Object[]{ new QualityOfProtection[]{qop},qop, null });
     }
@@ -139,7 +114,7 @@ public class TestSaslRPC extends TestRpcBase {
     NONE(),
     VALID(),
     INVALID(),
-    OTHER()
+    OTHER();
   }
   
   @BeforeClass
@@ -255,7 +230,7 @@ public class TestSaslRPC extends TestRpcBase {
       final Server server = setupTestServer(conf, 5, sm);
       doDigestRpc(server, sm);
     } finally {
-      SecurityUtil.setSecurityInfoProviders();
+      SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
     }
   }
 
@@ -284,7 +259,7 @@ public class TestSaslRPC extends TestRpcBase {
     addr = NetUtils.getConnectAddress(server);
     TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
         .getUserName()));
-    Token<TestTokenIdentifier> token = new Token<>(tokenId, sm);
+    Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId, sm);
     SecurityUtil.setTokenService(token, addr);
     current.addToken(token);
 
@@ -321,8 +296,8 @@ public class TestSaslRPC extends TestRpcBase {
 
     // set doPing to true
     newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
-    ConnectionId remoteId = ConnectionId.getConnectionId(
-        new InetSocketAddress(0), TestRpcService.class, null, 0, null, newConf);
+    ConnectionId remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0),
+        TestRpcService.class, null, 0, null, newConf);
     assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
         remoteId.getPingInterval());
     // set doPing to false
@@ -831,13 +806,13 @@ public class TestSaslRPC extends TestRpcBase {
     final TestTokenSecretManager sm = new TestTokenSecretManager();
     boolean useSecretManager = (serverAuth != SIMPLE);
     if (enableSecretManager != null) {
-      useSecretManager &= enableSecretManager;
+      useSecretManager &= enableSecretManager.booleanValue();
     }
     if (forceSecretManager != null) {
-      useSecretManager |= forceSecretManager;
+      useSecretManager |= forceSecretManager.booleanValue();
     }
     final SecretManager<?> serverSm = useSecretManager ? sm : null;
-
+    
     Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
       @Override
       public Server run() throws IOException {
@@ -892,13 +867,13 @@ public class TestSaslRPC extends TestRpcBase {
             proxy.ping(null, newEmptyRequest());
             // make sure the other side thinks we are who we said we are!!!
             assertEquals(clientUgi.getUserName(),
-                proxy.getAuthUser(null, newEmptyRequest()).getUser());
+                convert(proxy.getAuthUser(null, newEmptyRequest())));
             AuthMethod authMethod =
                 convert(proxy.getAuthMethod(null, newEmptyRequest()));
             // verify sasl completed with correct QOP
             assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
-                         RPC.getConnectionIdForProxy(proxy).getSaslQop());
-            return authMethod != null ? authMethod.toString() : null;
+                RPC.getConnectionIdForProxy(proxy).getSaslQop());
+            return authMethod.toString();
           } catch (ServiceException se) {
             if (se.getCause() instanceof RemoteException) {
               throw (RemoteException) se.getCause();
@@ -923,18 +898,21 @@ public class TestSaslRPC extends TestRpcBase {
       String actual) {
     assertEquals(expect.toString(), actual);
   }
-
-  private static void assertAuthEquals(Pattern expect, String actual) {
+  
+  private static void assertAuthEquals(Pattern expect,
+      String actual) {
     // this allows us to see the regexp and the value it didn't match
     if (!expect.matcher(actual).matches()) {
-      fail(); // it failed
+      assertEquals(expect, actual); // it failed
+    } else {
+      assertTrue(true); // it matched
     }
   }
 
   /*
    * Class used to test overriding QOP values using SaslPropertiesResolver
    */
-  static class AuthSaslPropertiesResolver extends SaslPropertiesResolver {
+  static class AuthSaslPropertiesResolver extends SaslPropertiesResolver{
 
     @Override
     public Map<String, String> getServerProperties(InetAddress address) {
@@ -943,7 +921,7 @@ public class TestSaslRPC extends TestRpcBase {
       return newPropertes;
     }
   }
-
+  
   public static void main(String[] args) throws Exception {
     System.out.println("Testing Kerberos authentication over RPC");
     if (args.length != 2) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
index c4dbcac..50d389c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
@@ -17,35 +17,40 @@
  */
 package org.apache.hadoop.security;
 
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+import org.junit.Assert;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.ipc.TestRpcBase;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
-import org.junit.Assert;
+import org.apache.hadoop.security.token.TokenInfo;
 import org.junit.Before;
 import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Enumeration;
+import org.apache.hadoop.ipc.TestRpcBase.TestTokenSecretManager;
+import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
+import org.apache.hadoop.ipc.TestRpcBase.TestTokenSelector;
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 
 /**
- * Test do as effective user.
+ *
  */
-public class TestDoAsEffectiveUser extends TestRpcBase {
+public class TestDoAsEffectiveUser {
   final private static String REAL_USER_NAME = "realUser1@HADOOP.APACHE.ORG";
   final private static String REAL_USER_SHORT_NAME = "realUser1";
   final private static String PROXY_USER_NAME = "proxyUser";
@@ -53,8 +58,8 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
   final private static String GROUP2_NAME = "group2";
   final private static String[] GROUP_NAMES = new String[] { GROUP1_NAME,
       GROUP2_NAME };
-
-  private TestRpcService client;
+  private static final String ADDRESS = "0.0.0.0";
+  private TestProtocol proxy;
   private static final Configuration masterConf = new Configuration();
   
   
@@ -77,7 +82,7 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
 
   private void configureSuperUserIPAddresses(Configuration conf,
       String superUserShortName) throws IOException {
-    ArrayList<String> ipList = new ArrayList<>();
+    ArrayList<String> ipList = new ArrayList<String>();
     Enumeration<NetworkInterface> netInterfaceList = NetworkInterface
         .getNetworkInterfaces();
     while (netInterfaceList.hasMoreElements()) {
@@ -125,19 +130,50 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
         curUGI.toString());
   }
 
-  private void checkRemoteUgi(final UserGroupInformation ugi,
-                              final Configuration conf) throws Exception {
+  @TokenInfo(TestTokenSelector.class)
+  public interface TestProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
+
+    String aMethod() throws IOException;
+    String getServerRemoteUser() throws IOException;
+  }
+
+  public class TestImpl implements TestProtocol {
+
+    @Override
+    public String aMethod() throws IOException {
+      return UserGroupInformation.getCurrentUser().toString();
+    }
+
+    @Override
+    public String getServerRemoteUser() throws IOException {
+      return Server.getRemoteUser().toString();
+    }
+    
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return TestProtocol.versionID;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      return new ProtocolSignature(TestProtocol.versionID, null);
+    }
+  }
+
+  private void checkRemoteUgi(final Server server,
+      final UserGroupInformation ugi, final Configuration conf)
+          throws Exception {
     ugi.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
-      public Void run() throws ServiceException {
-        client = getClient(addr, conf);
-        String currentUser = client.getCurrentUser(null,
-            newEmptyRequest()).getUser();
-        String serverRemoteUser = client.getServerRemoteUser(null,
-            newEmptyRequest()).getUser();
-
-        Assert.assertEquals(ugi.toString(), currentUser);
-        Assert.assertEquals(ugi.toString(), serverRemoteUser);
+      public Void run() throws IOException {
+        proxy = RPC.getProxy(
+            TestProtocol.class, TestProtocol.versionID,
+            NetUtils.getConnectAddress(server), conf);
+        Assert.assertEquals(ugi.toString(), proxy.aMethod());
+        Assert.assertEquals(ugi.toString(), proxy.getServerRemoteUser());
         return null;
       }
     });    
@@ -149,27 +185,29 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     conf.setStrings(DefaultImpersonationProvider.getTestProvider().
         getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
     configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
-    // Set RPC engine to protobuf RPC engine
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
-    UserGroupInformation.setConfiguration(conf);
-    final Server server = setupTestServer(conf, 5);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(5).setVerbose(true).build();
 
     refreshConf(conf);
     try {
+      server.start();
+
       UserGroupInformation realUserUgi = UserGroupInformation
           .createRemoteUser(REAL_USER_NAME);
-      checkRemoteUgi(realUserUgi, conf);
+      checkRemoteUgi(server, realUserUgi, conf);
       
-      UserGroupInformation proxyUserUgi =
-          UserGroupInformation.createProxyUserForTesting(
+      UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting(
           PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
-      checkRemoteUgi(proxyUserUgi, conf);
+      checkRemoteUgi(server, proxyUserUgi, conf);
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
     } finally {
-      stop(server, client);
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
     }
   }
 
@@ -180,25 +218,29 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     conf.setStrings(DefaultImpersonationProvider.getTestProvider().
             getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
         "group1");
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
-    UserGroupInformation.setConfiguration(conf);
-    final Server server = setupTestServer(conf, 5);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
 
     refreshConf(conf);
     try {
+      server.start();
+
       UserGroupInformation realUserUgi = UserGroupInformation
           .createRemoteUser(REAL_USER_NAME);
-      checkRemoteUgi(realUserUgi, conf);
+      checkRemoteUgi(server, realUserUgi, conf);
 
       UserGroupInformation proxyUserUgi = UserGroupInformation
           .createProxyUserForTesting(PROXY_USER_NAME, realUserUgi, GROUP_NAMES);
-      checkRemoteUgi(proxyUserUgi, conf);
+      checkRemoteUgi(server, proxyUserUgi, conf);
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
     } finally {
-      stop(server, client);
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
     }
   }
 
@@ -214,14 +256,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     conf.setStrings(DefaultImpersonationProvider.getTestProvider().
             getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
         "group1");
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
-    UserGroupInformation.setConfiguration(conf);
-    final Server server = setupTestServer(conf, 5);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
 
     refreshConf(conf);
     
     try {
+      server.start();
+
+      final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
       UserGroupInformation realUserUgi = UserGroupInformation
           .createRemoteUser(REAL_USER_NAME);
 
@@ -230,10 +275,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
       String retVal = proxyUserUgi
           .doAs(new PrivilegedExceptionAction<String>() {
             @Override
-            public String run() throws ServiceException {
-              client = getClient(addr, conf);
-              return client.getCurrentUser(null,
-                  newEmptyRequest()).getUser();
+            public String run() throws IOException {
+              proxy = RPC.getProxy(TestProtocol.class,
+                  TestProtocol.versionID, addr, conf);
+              String ret = proxy.aMethod();
+              return ret;
             }
           });
 
@@ -241,7 +287,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
-      stop(server, client);
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
     }
   }
   
@@ -250,14 +299,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     final Configuration conf = new Configuration();
     conf.setStrings(DefaultImpersonationProvider.getTestProvider().
         getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1");
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
-    UserGroupInformation.setConfiguration(conf);
-    final Server server = setupTestServer(conf, 2);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
 
     refreshConf(conf);
 
     try {
+      server.start();
+
+      final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
       UserGroupInformation realUserUgi = UserGroupInformation
           .createRemoteUser(REAL_USER_NAME);
 
@@ -266,10 +318,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
       String retVal = proxyUserUgi
           .doAs(new PrivilegedExceptionAction<String>() {
             @Override
-            public String run() throws ServiceException {
-              client = getClient(addr, conf);
-              return client.getCurrentUser(null,
-                  newEmptyRequest()).getUser();
+            public String run() throws IOException {
+              proxy = RPC.getProxy(TestProtocol.class,
+                  TestProtocol.versionID, addr, conf);
+              String ret = proxy.aMethod();
+              return ret;
             }
           });
 
@@ -277,7 +330,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
-      stop(server, client);
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
     }
   }
 
@@ -285,12 +341,15 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
   public void testRealUserGroupNotSpecified() throws IOException {
     final Configuration conf = new Configuration();
     configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME);
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
-    UserGroupInformation.setConfiguration(conf);
-    final Server server = setupTestServer(conf, 2);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
 
     try {
+      server.start();
+
+      final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
       UserGroupInformation realUserUgi = UserGroupInformation
           .createRemoteUser(REAL_USER_NAME);
 
@@ -299,10 +358,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
       String retVal = proxyUserUgi
           .doAs(new PrivilegedExceptionAction<String>() {
             @Override
-            public String run() throws ServiceException {
-              client = getClient(addr, conf);
-              return client.getCurrentUser(null,
-                  newEmptyRequest()).getUser();
+            public String run() throws IOException {
+              proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+                  TestProtocol.versionID, addr, conf);
+              String ret = proxy.aMethod();
+              return ret;
             }
           });
 
@@ -310,7 +370,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
-      stop(server, client);
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
     }
   }
   
@@ -321,14 +384,17 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     conf.setStrings(DefaultImpersonationProvider.getTestProvider().
             getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME),
         "group3");
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
-    UserGroupInformation.setConfiguration(conf);
-    final Server server = setupTestServer(conf, 2);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(2).setVerbose(false).build();
     
     refreshConf(conf);
 
     try {
+      server.start();
+
+      final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
       UserGroupInformation realUserUgi = UserGroupInformation
           .createRemoteUser(REAL_USER_NAME);
 
@@ -337,10 +403,11 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
       String retVal = proxyUserUgi
           .doAs(new PrivilegedExceptionAction<String>() {
             @Override
-            public String run() throws ServiceException {
-              client = getClient(addr, conf);
-              return client.getCurrentUser(null,
-                  newEmptyRequest()).getUser();
+            public String run() throws IOException {
+              proxy = RPC.getProxy(TestProtocol.class,
+                  TestProtocol.versionID, addr, conf);
+              String ret = proxy.aMethod();
+              return ret;
             }
           });
 
@@ -348,7 +415,10 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
-      stop(server, client);
+      server.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
     }
   }
 
@@ -362,17 +432,20 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     final Configuration conf = new Configuration(masterConf);
     TestTokenSecretManager sm = new TestTokenSecretManager();
     SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
-    RPC.setProtocolEngine(conf, TestRpcService.class,
-        ProtobufRpcEngine.class);
     UserGroupInformation.setConfiguration(conf);
-    final Server server = setupTestServer(conf, 5, sm);
+    final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
+        .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
+
+    server.start();
 
     final UserGroupInformation current = UserGroupInformation
         .createRemoteUser(REAL_USER_NAME);    
-
+    
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
     TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
         .getUserName()), new Text("SomeSuperUser"));
-    Token<TestTokenIdentifier> token = new Token<>(tokenId,
+    Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
     SecurityUtil.setTokenService(token, addr);
     UserGroupInformation proxyUserUgi = UserGroupInformation
@@ -380,19 +453,23 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     proxyUserUgi.addToken(token);
     
     refreshConf(conf);
-
+    
     String retVal = proxyUserUgi.doAs(new PrivilegedExceptionAction<String>() {
       @Override
       public String run() throws Exception {
         try {
-          client = getClient(addr, conf);
-          return client.getCurrentUser(null,
-              newEmptyRequest()).getUser();
+          proxy = RPC.getProxy(TestProtocol.class,
+              TestProtocol.versionID, addr, conf);
+          String ret = proxy.aMethod();
+          return ret;
         } catch (Exception e) {
           e.printStackTrace();
           throw e;
         } finally {
-          stop(server, client);
+          server.stop();
+          if (proxy != null) {
+            RPC.stopProxy(proxy);
+          }
         }
       }
     });
@@ -409,34 +486,42 @@ public class TestDoAsEffectiveUser extends TestRpcBase {
     TestTokenSecretManager sm = new TestTokenSecretManager();
     final Configuration newConf = new Configuration(masterConf);
     SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf);
-    // Set RPC engine to protobuf RPC engine
-    RPC.setProtocolEngine(newConf, TestRpcService.class,
-        ProtobufRpcEngine.class);
     UserGroupInformation.setConfiguration(newConf);
-    final Server server = setupTestServer(newConf, 5, sm);
+    final Server server = new RPC.Builder(newConf)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+        .setSecretManager(sm).build();
+
+    server.start();
 
     final UserGroupInformation current = UserGroupInformation
         .createUserForTesting(REAL_USER_NAME, GROUP_NAMES);
     
     refreshConf(newConf);
-
+    
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
     TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
         .getUserName()), new Text("SomeSuperUser"));
-    Token<TestTokenIdentifier> token = new Token<>(tokenId, sm);
+    Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
+        sm);
     SecurityUtil.setTokenService(token, addr);
     current.addToken(token);
     String retVal = current.doAs(new PrivilegedExceptionAction<String>() {
       @Override
       public String run() throws Exception {
         try {
-          client = getClient(addr, newConf);
-          return client.getCurrentUser(null,
-              newEmptyRequest()).getUser();
+          proxy = RPC.getProxy(TestProtocol.class,
+              TestProtocol.versionID, addr, newConf);
+          String ret = proxy.aMethod();
+          return ret;
         } catch (Exception e) {
           e.printStackTrace();
           throw e;
         } finally {
-          stop(server, client);
+          server.stop();
+          if (proxy != null) {
+            RPC.stopProxy(proxy);
+          }
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
index e45d70d..b3ea5f2 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.TestRpcBase.TestTokenIdentifier;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -29,11 +28,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
 
 import javax.security.auth.Subject;
 import javax.security.auth.kerberos.KerberosPrincipal;
@@ -55,22 +50,9 @@ import java.util.Set;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
-import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
-import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
-import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.hadoop.ipc.TestSaslRPC.*;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -127,7 +109,7 @@ public class TestUserGroupInformation {
     UserGroupInformation.setLoginUser(null);
   }
 
-  @Test(timeout = 30000)
+  @Test (timeout = 30000)
   public void testSimpleLogin() throws IOException {
     tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/proto/test.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto
index 6411f97..99cd93d 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto
@@ -88,6 +88,6 @@ message AuthMethodResponseProto {
   required string mechanismName = 2;
 }
 
-message UserResponseProto {
-  required string user = 1;
+message AuthUserResponseProto {
+  required string authUser = 1;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
index 06f6c4f..3292115 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
@@ -40,11 +40,9 @@ service TestProtobufRpcProto {
   rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
   rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
   rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
-  rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
+  rpc getAuthUser(EmptyRequestProto) returns (AuthUserResponseProto);
   rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
   rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto);
-  rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto);
-  rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto);
 }
 
 service TestProtobufRpc2Proto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57f7cb1..6b52949 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -168,6 +168,7 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntry;
 import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.ipc.WritableRpcEngine;
 import org.apache.hadoop.ipc.RefreshRegistry;
 import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.net.Node;
@@ -316,6 +317,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
         new TraceAdminProtocolServerSideTranslatorPB(this);
     BlockingService traceAdminService = TraceAdminService
         .newReflectiveBlockingService(traceAdminXlator);
+    
+    WritableRpcEngine.ensureInitialized();
 
     InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
     if (serviceRpcAddr != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
new file mode 100644
index 0000000..0b7ee33
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.mockito.Mockito.mock;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Test;
+
+/** Unit tests for using Delegation Token over RPC. */
+public class TestClientProtocolWithDelegationToken {
+  private static final String ADDRESS = "0.0.0.0";
+
+  public static final Log LOG = LogFactory
+      .getLog(TestClientProtocolWithDelegationToken.class);
+
+  private static final Configuration conf;
+  static {
+    conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+  }
+
+  static {
+    GenericTestUtils.setLogLevel(Client.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(Server.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL);
+  }
+
+  @Test
+  public void testDelegationTokenRpc() throws Exception {
+    ClientProtocol mockNN = mock(ClientProtocol.class);
+    FSNamesystem mockNameSys = mock(FSNamesystem.class);
+
+    DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+        3600000, mockNameSys);
+    sm.startThreads();
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(ClientProtocol.class).setInstance(mockNN)
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+        .setSecretManager(sm).build();
+    
+    server.start();
+
+    final UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    String user = current.getUserName();
+    Text owner = new Text(user);
+    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null);
+    Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
+        dtId, sm);
+    SecurityUtil.setTokenService(token, addr);
+    LOG.info("Service for token is " + token.getService());
+    current.addToken(token);
+    current.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        ClientProtocol proxy = null;
+        try {
+          proxy = RPC.getProxy(ClientProtocol.class,
+              ClientProtocol.versionID, addr, conf);
+          proxy.getServerDefaults();
+        } finally {
+          server.stop();
+          if (proxy != null) {
+            RPC.stopProxy(proxy);
+          }
+        }
+        return null;
+      }
+    });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d355573f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
index 729af0a..3fef5e2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.WritableRpcEngine;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
@@ -97,6 +98,8 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
     BlockingService refreshHSAdminProtocolService = HSAdminRefreshProtocolService
         .newReflectiveBlockingService(refreshHSAdminProtocolXlator);
 
+    WritableRpcEngine.ensureInitialized();
+
     clientRpcAddress = conf.getSocketAddr(
         JHAdminConfig.MR_HISTORY_BIND_HOST,
         JHAdminConfig.JHS_ADMIN_ADDRESS,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org