You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/06/07 16:22:14 UTC

hbase git commit: HBASE-15913 Sasl encryption doesn't work with AsyncRpcChannel

Repository: hbase
Updated Branches:
  refs/heads/branch-1 fad99a3f6 -> 3ff082cb8


HBASE-15913 Sasl encryption doesn't work with AsyncRpcChannel


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ff082cb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ff082cb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ff082cb

Branch: refs/heads/branch-1
Commit: 3ff082cb877a6401463eb3cbcd05e8736b74ee22
Parents: fad99a3
Author: stack <st...@apache.org>
Authored: Tue Jun 7 09:21:58 2016 -0700
Committer: stack <st...@apache.org>
Committed: Tue Jun 7 09:22:09 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  61 +++-
 .../hbase/security/SaslClientHandler.java       |  22 +-
 .../hbase/security/AbstractTestSecureIPC.java   | 305 +++++++++++++++++++
 .../hbase/security/TestAsyncSecureIPC.java      |  33 ++
 .../hadoop/hbase/security/TestSecureIPC.java    |  33 ++
 .../hadoop/hbase/security/TestSecureRPC.java    | 291 ------------------
 6 files changed, 441 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index e24050b..878d8b8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -241,6 +241,30 @@ public class AsyncRpcChannel {
   }
 
   /**
+   * Start HBase connection with sasl encryption
+   * @param ch channel to start connection on
+   */
+  private void startConnectionWithEncryption(Channel ch) {
+    // for rpc encryption, the order of ChannelInboundHandler should be:
+    // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
+    // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
+    // SaslClientHandler will handler this
+    ch.pipeline().addFirst("beforeUnwrapDecoder",
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
+    ch.pipeline().addLast("afterUnwrapDecoder",
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+    List<AsyncCall> callsToWrite;
+    synchronized (pendingCalls) {
+      connected = true;
+      callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+    }
+    for (AsyncCall call : callsToWrite) {
+      writeRequest(call);
+    }
+  }
+
+  /**
    * Get SASL handler
    * @param bootstrap to reconnect to
    * @return new SASL handler
@@ -252,6 +276,7 @@ public class AsyncRpcChannel {
         client.fallbackAllowed,
         client.conf.get("hbase.rpc.protection",
           SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
+        getChannelHeaderBytes(authMethod),
         new SaslClientHandler.SaslExceptionHandler() {
           @Override
           public void handle(int retryCount, Random random, Throwable cause) {
@@ -275,6 +300,11 @@ public class AsyncRpcChannel {
           public void onSuccess(Channel channel) {
             startHBaseConnection(channel);
           }
+
+          @Override
+          public void onSaslProtectionSucess(Channel channel) {
+            startConnectionWithEncryption(channel);
+          }
         });
   }
 
@@ -358,6 +388,26 @@ public class AsyncRpcChannel {
    * @throws java.io.IOException on failure to write
    */
   private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
+    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
+
+    ByteBuf b = channel.alloc().directBuffer(totalSize);
+
+    b.writeInt(header.getSerializedSize());
+    b.writeBytes(header.toByteArray());
+
+    return channel.writeAndFlush(b);
+  }
+
+  private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
+    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+    ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
+    b.putInt(header.getSerializedSize());
+    b.put(header.toByteArray());
+    return b.array();
+  }
+
+  private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
     RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
         .setServiceName(serviceName);
 
@@ -374,16 +424,7 @@ public class AsyncRpcChannel {
     }
 
     headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
-    RPCProtos.ConnectionHeader header = headerBuilder.build();
-
-    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
-
-    ByteBuf b = channel.alloc().directBuffer(totalSize);
-
-    b.writeInt(header.getSerializedSize());
-    b.writeBytes(header.toByteArray());
-
-    return channel.writeAndFlush(b);
+    return headerBuilder.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
index f52987b..c79cde7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
@@ -61,6 +61,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
   private final SaslExceptionHandler exceptionHandler;
   private final SaslSuccessfulConnectHandler successfulConnectHandler;
   private byte[] saslToken;
+  private byte[] connectionHeader;
   private boolean firstRead = true;
 
   private int retryCount = 0;
@@ -82,10 +83,11 @@ public class SaslClientHandler extends ChannelDuplexHandler {
    */
   public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
       Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
-      String rpcProtection, SaslExceptionHandler exceptionHandler,
+      String rpcProtection, byte[] connectionHeader, SaslExceptionHandler exceptionHandler,
       SaslSuccessfulConnectHandler successfulConnectHandler) throws IOException {
     this.ticket = ticket;
     this.fallbackAllowed = fallbackAllowed;
+    this.connectionHeader = connectionHeader;
 
     this.exceptionHandler = exceptionHandler;
     this.successfulConnectHandler = successfulConnectHandler;
@@ -236,8 +238,13 @@ public class SaslClientHandler extends ChannelDuplexHandler {
 
         if (!useWrap) {
           ctx.pipeline().remove(this);
+          successfulConnectHandler.onSuccess(ctx.channel());
+        } else {
+          byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length);
+          // write connection header
+          writeSaslToken(ctx, wrappedCH);
+          successfulConnectHandler.onSaslProtectionSucess(ctx.channel());
         }
-        successfulConnectHandler.onSuccess(ctx.channel());
       }
     }
     // Normal wrapped reading
@@ -322,9 +329,11 @@ public class SaslClientHandler extends ChannelDuplexHandler {
       super.write(ctx, msg, promise);
     } else {
       ByteBuf in = (ByteBuf) msg;
+      byte[] unwrapped = new byte[in.readableBytes()];
+      in.readBytes(unwrapped);
 
       try {
-        saslToken = saslClient.wrap(in.array(), in.readerIndex(), in.readableBytes());
+        saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length);
       } catch (SaslException se) {
         try {
           saslClient.dispose();
@@ -375,5 +384,12 @@ public class SaslClientHandler extends ChannelDuplexHandler {
      * @param channel which is successfully authenticated
      */
     public void onSuccess(Channel channel);
+
+    /**
+     * Runs on success if data protection used in Sasl
+     *
+     * @param channel which is successfully authenticated
+     */
+    public void onSaslProtectionSucess(Channel channel);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
new file mode 100644
index 0000000..e2eb84b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.BlockingService;
+
+import javax.security.sasl.SaslException;
+
+public abstract class AbstractTestSecureIPC {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
+      .getPath());
+
+  static final BlockingService SERVICE =
+      TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
+          new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
+
+            @Override
+            public TestProtos.EmptyResponseProto ping(RpcController controller,
+                                                      TestProtos.EmptyRequestProto request)
+                throws ServiceException {
+              return null;
+            }
+
+            @Override
+            public TestProtos.EmptyResponseProto error(RpcController controller,
+                                                       TestProtos.EmptyRequestProto request)
+                throws ServiceException {
+              return null;
+            }
+
+            @Override
+            public TestProtos.EchoResponseProto echo(RpcController controller,
+                                                     TestProtos.EchoRequestProto request)
+                throws ServiceException {
+              if (controller instanceof PayloadCarryingRpcController) {
+                PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
+                // If cells, scan them to check we are able to iterate what we were given and since
+                // this is
+                // an echo, just put them back on the controller creating a new block. Tests our
+                // block
+                // building.
+                CellScanner cellScanner = pcrc.cellScanner();
+                List<Cell> list = null;
+                if (cellScanner != null) {
+                  list = new ArrayList<Cell>();
+                  try {
+                    while (cellScanner.advance()) {
+                      list.add(cellScanner.current());
+                    }
+                  } catch (IOException e) {
+                    throw new ServiceException(e);
+                  }
+                }
+                cellScanner = CellUtil.createCellScanner(list);
+                ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
+              }
+              return TestProtos.EchoResponseProto.newBuilder()
+                  .setMessage(request.getMessage()).build();
+            }
+          });
+
+  private static MiniKdc KDC;
+  private static String HOST = "localhost";
+  private static String PRINCIPAL;
+
+  String krbKeytab;
+  String krbPrincipal;
+  UserGroupInformation ugi;
+  Configuration clientConf;
+  Configuration serverConf;
+
+  abstract Class<? extends RpcClient> getRpcClientClass();
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Properties conf = MiniKdc.createConf();
+    conf.put(MiniKdc.DEBUG, true);
+    KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
+    KDC.start();
+    PRINCIPAL = "hbase/" + HOST;
+    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
+    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
+    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (KDC != null) {
+      KDC.stop();
+    }
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUpTest() throws Exception {
+    krbKeytab = getKeytabFileForTesting();
+    krbPrincipal = getPrincipalForTesting();
+    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
+    clientConf = getSecuredConfiguration();
+    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName());
+    serverConf = getSecuredConfiguration();
+  }
+
+  @Test
+  public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
+    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
+
+    // check that the login user is okay:
+    assertSame(ugi, ugi2);
+    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
+    assertEquals(krbPrincipal, ugi.getUserName());
+
+    callRpcService(User.create(ugi2));
+  }
+
+  @Test
+  public void testRpcFallbackToSimpleAuth() throws Exception {
+    String clientUsername = "testuser";
+    UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
+        new String[]{clientUsername});
+
+    // check that the client user is insecure
+    assertNotSame(ugi, clientUgi);
+    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
+    assertEquals(clientUsername, clientUgi.getUserName());
+
+    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
+    callRpcService(User.create(clientUgi));
+  }
+
+  void setRpcProtection(String clientProtection, String serverProtection) {
+    clientConf.set("hbase.rpc.protection", clientProtection);
+    serverConf.set("hbase.rpc.protection", serverProtection);
+  }
+
+  /**
+   * Test various qpos of Server and Client.
+   * @throws Exception
+   */
+  @Test
+  public void testSaslWithCommonQop() throws Exception {
+    setRpcProtection("authentication", "authentication");
+    callRpcService(User.create(ugi));
+
+    setRpcProtection("integrity", "integrity");
+    callRpcService(User.create(ugi));
+
+    setRpcProtection("privacy", "privacy");
+    callRpcService(User.create(ugi));
+  }
+
+  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
+      throws Exception {
+    Configuration cnf = new Configuration();
+    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(cnf);
+    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
+    return UserGroupInformation.getLoginUser();
+  }
+
+  /**
+   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown
+   * from the stub, this function will throw root cause of that exception.
+   */
+  private void callRpcService(User clientUser) throws Exception {
+    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
+    Mockito.when(securityInfoMock.getServerPrincipal())
+        .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
+
+    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
+
+    RpcServerInterface rpcServer =
+        new RpcServer(null, "AbstractTestSecureIPC",
+            Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
+            serverConf, new FifoRpcScheduler(serverConf, 1));
+    rpcServer.start();
+    try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
+        HConstants.DEFAULT_CLUSTER_ID.toString())) {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      BlockingRpcChannel channel =
+          rpcClient.createBlockingRpcChannel(
+              ServerName.valueOf(address.getHostName(), address.getPort(),
+                  System.currentTimeMillis()), clientUser, 0);
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
+      TestThread th1 = new TestThread(stub);
+      final Throwable exception[] = new Throwable[1];
+      Collections.synchronizedList(new ArrayList<Throwable>());
+      Thread.UncaughtExceptionHandler exceptionHandler =
+          new Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread th, Throwable ex) {
+              exception[0] = ex;
+            }
+          };
+      th1.setUncaughtExceptionHandler(exceptionHandler);
+      th1.start();
+      th1.join();
+      if (exception[0] != null) {
+        // throw root cause.
+        while (exception[0].getCause() != null) {
+          exception[0] = exception[0].getCause();
+        }
+        throw (Exception) exception[0];
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  public static class TestThread extends Thread {
+    private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
+
+    public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) {
+      this.stub = stub;
+    }
+
+    @Override
+    public void run() {
+      try {
+        int[] messageSize = new int[] {100, 1000, 10000};
+        for (int i = 0; i < messageSize.length; i++) {
+          String input = RandomStringUtils.random(messageSize[i]);
+          String result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder()
+              .setMessage(input).build()).getMessage();
+          assertEquals(input, result);
+        }
+      } catch (ServiceException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
new file mode 100644
index 0000000..2fc270d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestAsyncSecureIPC extends AbstractTestSecureIPC {
+
+  Class<? extends RpcClient> getRpcClientClass() {
+    return AsyncRpcClient.class;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
new file mode 100644
index 0000000..baaa985
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientImpl;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestSecureIPC extends AbstractTestSecureIPC {
+
+  Class<? extends RpcClient> getRpcClientClass() {
+    return RpcClientImpl.class;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ff082cb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
deleted file mode 100644
index a5700d0..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ThreadLocalRandom;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcServerInterface;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.minikdc.MiniKdc;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-
-@Category(SmallTests.class)
-public class TestSecureRPC {
-
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
-      .getPath());
-
-  static final BlockingService SERVICE =
-      TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
-          new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
-
-            @Override
-            public TestProtos.EmptyResponseProto ping(RpcController controller,
-                                                      TestProtos.EmptyRequestProto request)
-                throws ServiceException {
-              return null;
-            }
-
-            @Override
-            public TestProtos.EmptyResponseProto error(RpcController controller,
-                                                       TestProtos.EmptyRequestProto request)
-                throws ServiceException {
-              return null;
-            }
-
-            @Override
-            public TestProtos.EchoResponseProto echo(RpcController controller,
-                                                     TestProtos.EchoRequestProto request)
-                throws ServiceException {
-              if (controller instanceof PayloadCarryingRpcController) {
-                PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
-                // If cells, scan them to check we are able to iterate what we were given and since
-                // this is
-                // an echo, just put them back on the controller creating a new block. Tests our
-                // block
-                // building.
-                CellScanner cellScanner = pcrc.cellScanner();
-                List<Cell> list = null;
-                if (cellScanner != null) {
-                  list = new ArrayList<Cell>();
-                  try {
-                    while (cellScanner.advance()) {
-                      list.add(cellScanner.current());
-                    }
-                  } catch (IOException e) {
-                    throw new ServiceException(e);
-                  }
-                }
-                cellScanner = CellUtil.createCellScanner(list);
-                ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
-              }
-              return TestProtos.EchoResponseProto.newBuilder()
-                  .setMessage(request.getMessage()).build();
-            }
-          });
-
-  private static MiniKdc KDC;
-
-  private static String HOST = "localhost";
-
-  private static String PRINCIPAL;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    Properties conf = MiniKdc.createConf();
-    conf.put(MiniKdc.DEBUG, true);
-    KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
-    KDC.start();
-    PRINCIPAL = "hbase/" + HOST;
-    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
-    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
-    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (KDC != null) {
-      KDC.stop();
-    }
-    TEST_UTIL.cleanupTestDir();
-  }
-
-  @Test
-  public void testRpc() throws Exception {
-    testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
-  }
-
-  @Test
-  public void testRpcWithInsecureFallback() throws Exception {
-    testRpcFallbackToSimpleAuth(RpcClientImpl.class);
-  }
-
-  @Test
-  public void testAsyncRpc() throws Exception {
-    testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
-  }
-
-  @Test
-  public void testAsyncRpcWithInsecureFallback() throws Exception {
-    testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
-  }
-
-  private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
-      throws Exception {
-    String krbKeytab = getKeytabFileForTesting();
-    String krbPrincipal = getPrincipalForTesting();
-
-    UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
-    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
-
-    // check that the login user is okay:
-    assertSame(ugi, ugi2);
-    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
-    assertEquals(krbPrincipal, ugi.getUserName());
-
-    Configuration clientConf = getSecuredConfiguration();
-    callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
-  }
-
-  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
-      throws Exception {
-    Configuration cnf = new Configuration();
-    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-    UserGroupInformation.setConfiguration(cnf);
-    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
-    return UserGroupInformation.getLoginUser();
-  }
-
-  private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
-                              Configuration clientConf, boolean allowInsecureFallback)
-      throws Exception {
-    Configuration clientConfCopy = new Configuration(clientConf);
-    clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
-
-    Configuration conf = getSecuredConfiguration();
-    conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
-
-    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
-    Mockito.when(securityInfoMock.getServerPrincipal())
-        .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
-    SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
-
-    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
-
-    RpcServerInterface rpcServer =
-        new RpcServer(null, "AbstractTestSecureIPC",
-            Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
-            conf, new FifoRpcScheduler(conf, 1));
-    rpcServer.start();
-    try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
-        HConstants.DEFAULT_CLUSTER_ID.toString())) {
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      BlockingRpcChannel channel =
-          rpcClient.createBlockingRpcChannel(
-
-            ServerName.valueOf(address.getHostName(), address.getPort(),
-            System.currentTimeMillis()), clientUser, 5000);
-      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
-          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
-      List<String> results = new ArrayList<String>();
-      TestThread th1 = new TestThread(stub, results);
-      th1.start();
-      th1.join();
-
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
-  public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
-    String krbKeytab = getKeytabFileForTesting();
-    String krbPrincipal = getPrincipalForTesting();
-
-    UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
-    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
-    assertEquals(krbPrincipal, ugi.getUserName());
-
-    String clientUsername = "testuser";
-    UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
-        new String[]{clientUsername});
-
-    // check that the client user is insecure
-    assertNotSame(ugi, clientUgi);
-    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
-    assertEquals(clientUsername, clientUgi.getUserName());
-
-    Configuration clientConf = new Configuration();
-    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
-    callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
-  }
-
-  public static class TestThread extends Thread {
-      private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
-
-      private final List<String> results;
-
-          public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
-          this.stub = stub;
-          this.results = results;
-        }
-
-          @Override
-      public void run() {
-          String result;
-          try {
-              result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf(
-                  ThreadLocalRandom.current().nextInt())).build()).getMessage();
-            } catch (ServiceException e) {
-              throw new RuntimeException(e);
-            }
-          if (results != null) {
-              synchronized (results) {
-                  results.add(result);
-                }
-            }
-        }
-    }
-}