You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/01/15 01:40:10 UTC

hbase git commit: HBASE-14865 Support passing multiple QOPs to SaslClient/Server via hbase.rpc.protection (Appy)

Repository: hbase
Updated Branches:
  refs/heads/master ae7cc0c84 -> 4ac8d4ce6


HBASE-14865 Support passing multiple QOPs to SaslClient/Server via hbase.rpc.protection (Appy)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4ac8d4ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4ac8d4ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4ac8d4ce

Branch: refs/heads/master
Commit: 4ac8d4ce610a107112acb6aa070157691c022e90
Parents: ae7cc0c
Author: tedyu <yu...@gmail.com>
Authored: Thu Jan 14 16:39:52 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Thu Jan 14 16:39:52 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  33 +-
 .../hbase/security/HBaseSaslRpcClient.java      |   8 +-
 .../hbase/security/SaslClientHandler.java       |  30 +-
 .../apache/hadoop/hbase/security/SaslUtil.java  |  74 +++--
 .../hbase/security/TestHBaseSaslRpcClient.java  | 309 ++++++++++++++++++
 .../hadoop/hbase/security/TestSaslUtil.java     |  59 ++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |   4 +-
 .../hbase/security/HBaseSaslRpcServer.java      |   9 +-
 .../hbase/security/AbstractTestSecureIPC.java   | 245 ++++++++++++++
 .../hbase/security/TestAsyncSecureIPC.java      |  33 ++
 .../hbase/security/TestHBaseSaslRpcClient.java  | 325 -------------------
 .../hadoop/hbase/security/TestSecureIPC.java    |  33 ++
 .../hadoop/hbase/security/TestSecureRPC.java    | 216 ------------
 13 files changed, 751 insertions(+), 627 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 44e8322..69978fc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -32,7 +32,6 @@ import io.netty.util.concurrent.Promise;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -104,8 +103,7 @@ public class AsyncRpcChannel {
   final String serviceName;
   final InetSocketAddress address;
 
-  private int ioFailureCounter = 0;
-  private int connectFailureCounter = 0;
+  private int failureCounter = 0;
 
   boolean useSasl;
   AuthMethod authMethod;
@@ -134,7 +132,7 @@ public class AsyncRpcChannel {
    * @param bootstrap to construct channel on
    * @param client    to connect with
    * @param ticket of user which uses connection
-   *               @param serviceName name of service to connect to
+   * @param serviceName name of service to connect to
    * @param address to connect to
    */
   public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
@@ -166,11 +164,7 @@ public class AsyncRpcChannel {
           @Override
           public void operationComplete(final ChannelFuture f) throws Exception {
             if (!f.isSuccess()) {
-              if (f.cause() instanceof SocketException) {
-                retryOrClose(bootstrap, connectFailureCounter++, f.cause());
-              } else {
-                retryOrClose(bootstrap, ioFailureCounter++, f.cause());
-              }
+              retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
               return;
             }
             channel = f.channel();
@@ -263,13 +257,8 @@ public class AsyncRpcChannel {
               // Handle Sasl failure. Try to potentially get new credentials
               handleSaslConnectionFailure(retryCount, cause, realTicket);
 
-              // Try to reconnect
-              client.newTimeout(new TimerTask() {
-                @Override
-                public void run(Timeout timeout) throws Exception {
-                  connect(bootstrap);
-                }
-              }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
+              retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
+                  cause);
             } catch (IOException | InterruptedException e) {
               close(e);
             }
@@ -286,16 +275,18 @@ public class AsyncRpcChannel {
    * Retry to connect or close
    *
    * @param bootstrap      to connect with
-   * @param connectCounter amount of tries
+   * @param failureCount   failure count
    * @param e              exception of fail
    */
-  private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
-    if (connectCounter < client.maxRetries) {
+  private void retryOrClose(final Bootstrap bootstrap, int failureCount,
+      long timeout, Throwable e) {
+    if (failureCount < client.maxRetries) {
       client.newTimeout(new TimerTask() {
-        @Override public void run(Timeout timeout) throws Exception {
+        @Override
+        public void run(Timeout timeout) throws Exception {
           connect(bootstrap);
         }
-      }, client.failureSleep, TimeUnit.MILLISECONDS);
+      }, timeout, TimeUnit.MILLISECONDS);
     } else {
       client.failedServers.addToFailedServers(address);
       close(e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
index ab3ee0e..ce32ed9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -59,6 +60,7 @@ public class HBaseSaslRpcClient {
 
   private final SaslClient saslClient;
   private final boolean fallbackAllowed;
+  protected final Map<String, String> saslProps;
   /**
    * Create a HBaseSaslRpcClient for an authentication method
    *
@@ -96,7 +98,7 @@ public class HBaseSaslRpcClient {
       Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
       String rpcProtection) throws IOException {
     this.fallbackAllowed = fallbackAllowed;
-    SaslUtil.initSaslProperties(rpcProtection);
+    saslProps = SaslUtil.initSaslProperties(rpcProtection);
     switch (method) {
     case DIGEST:
       if (LOG.isDebugEnabled())
@@ -138,13 +140,13 @@ public class HBaseSaslRpcClient {
       String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
       throws IOException {
     return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm,
-        SaslUtil.SASL_PROPS, saslClientCallbackHandler);
+        saslProps, saslClientCallbackHandler);
   }
 
   protected SaslClient createKerberosSaslClient(String[] mechanismNames,
       String userFirstPart, String userSecondPart) throws IOException {
     return Sasl.createSaslClient(mechanismNames, null, userFirstPart,
-        userSecondPart, SaslUtil.SASL_PROPS, null);
+        userSecondPart, saslProps, null);
   }
 
   private static void readStatus(DataInputStream inStream) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
index f52987b..bfb625b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
@@ -41,6 +41,7 @@ import javax.security.sasl.SaslException;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 import java.util.Random;
 
 /**
@@ -58,6 +59,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
    * Used for client or server's token to send or receive from each other.
    */
   private final SaslClient saslClient;
+  private final Map<String, String> saslProps;
   private final SaslExceptionHandler exceptionHandler;
   private final SaslSuccessfulConnectHandler successfulConnectHandler;
   private byte[] saslToken;
@@ -67,8 +69,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
   private Random random;
 
   /**
-   * Constructor
-   *
    * @param ticket                   the ugi
    * @param method                   auth method
    * @param token                    for Sasl
@@ -76,8 +76,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
    * @param fallbackAllowed          True if server may also fall back to less secure connection
    * @param rpcProtection            Quality of protection. Can be 'authentication', 'integrity' or
    *                                 'privacy'.
-   * @param exceptionHandler         handler for exceptions
-   * @param successfulConnectHandler handler for succesful connects
    * @throws java.io.IOException if handler could not be created
    */
   public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
@@ -90,7 +88,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
     this.exceptionHandler = exceptionHandler;
     this.successfulConnectHandler = successfulConnectHandler;
 
-    SaslUtil.initSaslProperties(rpcProtection);
+    saslProps = SaslUtil.initSaslProperties(rpcProtection);
     switch (method) {
     case DIGEST:
       if (LOG.isDebugEnabled())
@@ -125,32 +123,23 @@ public class SaslClientHandler extends ChannelDuplexHandler {
 
   /**
    * Create a Digest Sasl client
-   *
-   * @param mechanismNames            names of mechanisms
-   * @param saslDefaultRealm          default realm for sasl
-   * @param saslClientCallbackHandler handler for the client
-   * @return new SaslClient
-   * @throws java.io.IOException if creation went wrong
    */
   protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm,
       CallbackHandler saslClientCallbackHandler) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, SaslUtil.SASL_PROPS,
+    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps,
         saslClientCallbackHandler);
   }
 
   /**
    * Create Kerberos client
    *
-   * @param mechanismNames names of mechanisms
    * @param userFirstPart  first part of username
    * @param userSecondPart second part of username
-   * @return new SaslClient
-   * @throws java.io.IOException if fails
    */
   protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart,
       String userSecondPart) throws IOException {
     return Sasl
-        .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, SaslUtil.SASL_PROPS,
+        .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps,
             null);
   }
 
@@ -269,11 +258,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
     }
   }
 
-  /**
-   * Write SASL token
-   * @param ctx to write to
-   * @param saslToken to write
-   */
   private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) {
     ByteBuf b = ctx.alloc().buffer(4 + saslToken.length);
     b.writeInt(saslToken.length);
@@ -290,9 +274,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
 
   /**
    * Get the read status
-   *
-   * @param inStream to read
-   * @throws org.apache.hadoop.ipc.RemoteException if status was not success
    */
   private static void readStatus(ByteBuf inStream) throws RemoteException {
     int status = inStream.readInt(); // read status
@@ -360,7 +341,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
      *
      * @param retryCount current retry count
      * @param random     to create new backoff with
-     * @param cause      of fail
      */
     public void handle(int retryCount, Random random, Throwable cause);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
index f2f3393..b505fc0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
@@ -32,24 +32,31 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public class SaslUtil {
   private static final Log log = LogFactory.getLog(SaslUtil.class);
   public static final String SASL_DEFAULT_REALM = "default";
-  public static final Map<String, String> SASL_PROPS =
-      new TreeMap<String, String>();
   public static final int SWITCH_TO_SIMPLE_AUTH = -88;
 
-  public static enum QualityOfProtection {
+  public enum QualityOfProtection {
     AUTHENTICATION("auth"),
     INTEGRITY("auth-int"),
     PRIVACY("auth-conf");
 
-    public final String saslQop;
+    private final String saslQop;
 
-    private QualityOfProtection(String saslQop) {
+    QualityOfProtection(String saslQop) {
       this.saslQop = saslQop;
     }
 
     public String getSaslQop() {
       return saslQop;
     }
+
+    public boolean matches(String stringQop) {
+      if (saslQop.equals(stringQop)) {
+        log.warn("Use authentication/integrity/privacy as value for rpc protection "
+            + "configurations instead of auth/auth-int/auth-conf.");
+        return true;
+      }
+      return name().equalsIgnoreCase(stringQop);
+    }
   }
 
   /** Splitting fully qualified Kerberos name into parts */
@@ -71,40 +78,39 @@ public class SaslUtil {
 
   /**
    * Returns {@link org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection}
-   * corresponding to the given {@code stringQop} value. Returns null if value is
-   * invalid.
+   * corresponding to the given {@code stringQop} value.
+   * @throws IllegalArgumentException If stringQop doesn't match any QOP.
    */
   public static QualityOfProtection getQop(String stringQop) {
-    QualityOfProtection qop = null;
-    if (QualityOfProtection.AUTHENTICATION.name().toLowerCase().equals(stringQop)
-        || QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.AUTHENTICATION;
-    } else if (QualityOfProtection.INTEGRITY.name().toLowerCase().equals(stringQop)
-        || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.INTEGRITY;
-    } else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(stringQop)
-        || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.PRIVACY;
-    }
-    if (qop == null) {
-      throw new IllegalArgumentException("Invalid qop: " +  stringQop
-          + ". It must be one of 'authentication', 'integrity', 'privacy'.");
+    for (QualityOfProtection qop : QualityOfProtection.values()) {
+      if (qop.matches(stringQop)) {
+        return qop;
+      }
     }
-    if (QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)
-        || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)
-        || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
-      log.warn("Use authentication/integrity/privacy as value for rpc protection "
-          + "configurations instead of auth/auth-int/auth-conf.");
-    }
-    return qop;
+    throw new IllegalArgumentException("Invalid qop: " +  stringQop
+        + ". It must be one of 'authentication', 'integrity', 'privacy'.");
   }
 
-  static void initSaslProperties(String rpcProtection) {
-    QualityOfProtection saslQOP = getQop(rpcProtection);
-    if (saslQOP == null) {
-      saslQOP = QualityOfProtection.AUTHENTICATION;
+  /**
+   * @param rpcProtection Value of 'hbase.rpc.protection' configuration.
+   * @return Map with values for SASL properties.
+   */
+  static Map<String, String> initSaslProperties(String rpcProtection) {
+    String saslQop;
+    if (rpcProtection.isEmpty()) {
+      saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
+    } else {
+      String[] qops = rpcProtection.split(",");
+      StringBuilder saslQopBuilder = new StringBuilder();
+      for (int i = 0; i < qops.length; ++i) {
+        QualityOfProtection qop = getQop(qops[i]);
+        saslQopBuilder.append(",").append(qop.getSaslQop());
+      }
+      saslQop = saslQopBuilder.substring(1);  // remove first ','
     }
-    SaslUtil.SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
-    SaslUtil.SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+    Map<String, String> saslProps = new TreeMap<>();
+    saslProps.put(Sasl.QOP, saslQop);
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    return saslProps;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
new file mode 100644
index 0000000..0e3aeab
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.TextOutputCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient.SaslClientCallbackHandler;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import com.google.common.base.Strings;
+
+@Category({SecurityTests.class, SmallTests.class})
+public class TestHBaseSaslRpcClient {
+
+  static {
+    System.setProperty("java.security.krb5.realm", "DOMAIN.COM");
+    System.setProperty("java.security.krb5.kdc", "DOMAIN.COM");
+  }
+
+  static final String DEFAULT_USER_NAME = "principal";
+  static final String DEFAULT_USER_PASSWORD = "password";
+
+  private static final Logger LOG = Logger.getLogger(TestHBaseSaslRpcClient.class);
+
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void before() {
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+  }
+
+  @Test
+  public void testSaslClientUsesGivenRpcProtection() throws Exception {
+    Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
+        DEFAULT_USER_PASSWORD);
+    for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) {
+      String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token,
+          "principal/host@DOMAIN.COM", false, qop.name()) {
+        public String getQop() {
+          return saslProps.get(Sasl.QOP);
+        }
+      }.getQop();
+      assertEquals(negotiatedQop, qop.getSaslQop());
+    }
+  }
+
+  @Test
+  public void testSaslClientCallbackHandler() throws UnsupportedCallbackException {
+    final Token<? extends TokenIdentifier> token = createTokenMock();
+    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+
+    final NameCallback nameCallback = mock(NameCallback.class);
+    final PasswordCallback passwordCallback = mock(PasswordCallback.class);
+    final RealmCallback realmCallback = mock(RealmCallback.class);
+    final RealmChoiceCallback realmChoiceCallback = mock(RealmChoiceCallback.class);
+
+    Callback[] callbackArray = {nameCallback, passwordCallback,
+        realmCallback, realmChoiceCallback};
+    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    saslClCallbackHandler.handle(callbackArray);
+    verify(nameCallback).setName(anyString());
+    verify(realmCallback).setText(anyString());
+    verify(passwordCallback).setPassword(any(char[].class));
+  }
+
+  @Test
+  public void testSaslClientCallbackHandlerWithException() {
+    final Token<? extends TokenIdentifier> token = createTokenMock();
+    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    try {
+      saslClCallbackHandler.handle(new Callback[] { mock(TextOutputCallback.class) });
+    } catch (UnsupportedCallbackException expEx) {
+      //expected
+    } catch (Exception ex) {
+      fail("testSaslClientCallbackHandlerWithException error : " + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testHBaseSaslRpcClientCreation() throws Exception {
+    //creation kerberos principal check section
+    assertFalse(assertSuccessCreationKerberosPrincipal(null));
+    assertFalse(assertSuccessCreationKerberosPrincipal("DOMAIN.COM"));
+    assertFalse(assertSuccessCreationKerberosPrincipal("principal/DOMAIN.COM"));
+    if (!assertSuccessCreationKerberosPrincipal("principal/localhost@DOMAIN.COM")) {
+      // XXX: This can fail if kerberos support in the OS is not sane, see HBASE-10107.
+      // For now, don't assert, just warn
+      LOG.warn("Could not create a SASL client with valid Kerberos credential");
+    }
+
+    //creation digest principal check section
+    assertFalse(assertSuccessCreationDigestPrincipal(null, null));
+    assertFalse(assertSuccessCreationDigestPrincipal("", ""));
+    assertFalse(assertSuccessCreationDigestPrincipal("", null));
+    assertFalse(assertSuccessCreationDigestPrincipal(null, ""));
+    assertTrue(assertSuccessCreationDigestPrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+
+    //creation simple principal check section
+    assertFalse(assertSuccessCreationSimplePrincipal("", ""));
+    assertFalse(assertSuccessCreationSimplePrincipal(null, null));
+    assertFalse(assertSuccessCreationSimplePrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+
+    //exceptions check section
+    assertTrue(assertIOExceptionThenSaslClientIsNull(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+    assertTrue(assertIOExceptionWhenGetStreamsBeforeConnectCall(
+        DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+  }
+
+  @Test
+  public void testAuthMethodReadWrite() throws IOException {
+    DataInputBuffer in = new DataInputBuffer();
+    DataOutputBuffer out = new DataOutputBuffer();
+
+    assertAuthMethodRead(in, AuthMethod.SIMPLE);
+    assertAuthMethodRead(in, AuthMethod.KERBEROS);
+    assertAuthMethodRead(in, AuthMethod.DIGEST);
+
+    assertAuthMethodWrite(out, AuthMethod.SIMPLE);
+    assertAuthMethodWrite(out, AuthMethod.KERBEROS);
+    assertAuthMethodWrite(out, AuthMethod.DIGEST);
+  }
+
+  private void assertAuthMethodRead(DataInputBuffer in, AuthMethod authMethod)
+      throws IOException {
+    in.reset(new byte[] {authMethod.code}, 1);
+    assertEquals(authMethod, AuthMethod.read(in));
+  }
+
+  private void assertAuthMethodWrite(DataOutputBuffer out, AuthMethod authMethod)
+      throws IOException {
+    authMethod.write(out);
+    assertEquals(authMethod.code, out.getData()[0]);
+    out.reset();
+  }
+
+  private boolean assertIOExceptionWhenGetStreamsBeforeConnectCall(String principal,
+      String password) throws IOException {
+    boolean inState = false;
+    boolean outState = false;
+
+    HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
+        createTokenMockWithCredentials(principal, password), principal, false) {
+      @Override
+      public SaslClient createDigestSaslClient(String[] mechanismNames,
+          String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
+              throws IOException {
+        return Mockito.mock(SaslClient.class);
+      }
+
+      @Override
+      public SaslClient createKerberosSaslClient(String[] mechanismNames,
+          String userFirstPart, String userSecondPart) throws IOException {
+        return Mockito.mock(SaslClient.class);
+      }
+    };
+
+    try {
+      rpcClient.getInputStream(Mockito.mock(InputStream.class));
+    } catch(IOException ex) {
+      //Sasl authentication exchange hasn't completed yet
+      inState = true;
+    }
+
+    try {
+      rpcClient.getOutputStream(Mockito.mock(OutputStream.class));
+    } catch(IOException ex) {
+      //Sasl authentication exchange hasn't completed yet
+      outState = true;
+    }
+
+    return inState && outState;
+  }
+
+  private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) {
+    try {
+      new HBaseSaslRpcClient(AuthMethod.DIGEST,
+          createTokenMockWithCredentials(principal, password), principal, false) {
+        @Override
+        public SaslClient createDigestSaslClient(String[] mechanismNames,
+            String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
+                throws IOException {
+          return null;
+        }
+
+        @Override
+        public SaslClient createKerberosSaslClient(String[] mechanismNames,
+            String userFirstPart, String userSecondPart) throws IOException {
+          return null;
+        }
+      };
+      return false;
+    } catch (IOException ex) {
+      return true;
+    }
+  }
+
+  private boolean assertSuccessCreationKerberosPrincipal(String principal) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = createSaslRpcClientForKerberos(principal);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private boolean assertSuccessCreationDigestPrincipal(String principal, String password) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
+          createTokenMockWithCredentials(principal, password), principal, false);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private boolean assertSuccessCreationSimplePrincipal(String principal, String password) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = createSaslRpcClientSimple(principal, password);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private HBaseSaslRpcClient createSaslRpcClientForKerberos(String principal)
+      throws IOException {
+    return new HBaseSaslRpcClient(AuthMethod.KERBEROS, createTokenMock(), principal, false);
+  }
+
+  private Token<? extends TokenIdentifier> createTokenMockWithCredentials(
+      String principal, String password)
+      throws IOException {
+    Token<? extends TokenIdentifier> token = createTokenMock();
+    if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(password)) {
+      when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+      when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+    }
+    return token;
+  }
+
+  private HBaseSaslRpcClient createSaslRpcClientSimple(String principal, String password)
+      throws IOException {
+    return new HBaseSaslRpcClient(AuthMethod.SIMPLE, createTokenMock(), principal, false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Token<? extends TokenIdentifier> createTokenMock() {
+    return mock(Token.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java
new file mode 100644
index 0000000..6c99739
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import javax.security.sasl.Sasl;
+import java.util.Map;
+
+@Category({SecurityTests.class, SmallTests.class})
+public class TestSaslUtil {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testInitSaslProperties() {
+    Map<String, String> props;
+
+    props = SaslUtil.initSaslProperties("integrity");
+    assertEquals(props.get(Sasl.QOP), "auth-int");
+
+    props = SaslUtil.initSaslProperties("privacy,authentication");
+    assertEquals(props.get(Sasl.QOP), "auth-conf,auth");
+
+    props = SaslUtil.initSaslProperties("integrity,authentication,privacy");
+    assertEquals(props.get(Sasl.QOP), "auth-int,auth,auth-conf");
+
+    exception.expect(IllegalArgumentException.class);
+    props = SaslUtil.initSaslProperties("xyz");
+    assertEquals(props.get(Sasl.QOP), "auth");
+
+    exception.expect(IllegalArgumentException.class);
+    props = SaslUtil.initSaslProperties("");
+    assertEquals(props.get(Sasl.QOP), "auth");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ed8d37d..d32fca7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1413,7 +1413,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
               }
               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
-                  SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
+                  HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
                       secretManager, this));
               break;
             default:
@@ -1433,7 +1433,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
                 public Object run() throws SaslException {
                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
                       .getMechanismName(), names[0], names[1],
-                      SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
+                      HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
                   return null;
                 }
               });

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
index b9e56d9..450db64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.security;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -48,11 +49,17 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 public class HBaseSaslRpcServer {
   private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
 
+  private static Map<String, String> saslProps = null;
+
   public static void init(Configuration conf) {
-    SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 
+    saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
           QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
   }
 
+  public static Map<String, String> getSaslProps() {
+    return saslProps;
+  }
+
   public static <T extends TokenIdentifier> T getIdentifier(String id,
       SecretManager<T> secretManager) throws InvalidToken {
     byte[] tokenId = SaslUtil.decodeIdentifier(id);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
new file mode 100644
index 0000000..6145838
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
@@ -0,0 +1,245 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
+import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.BlockingService;
+
+import javax.security.sasl.SaslException;
+
+public abstract class AbstractTestSecureIPC {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
+      .getPath());
+
+  private static MiniKdc KDC;
+  private static String HOST = "localhost";
+  private static String PRINCIPAL;
+
+  String krbKeytab;
+  String krbPrincipal;
+  UserGroupInformation ugi;
+  Configuration clientConf;
+  Configuration serverConf;
+
+  abstract Class<? extends RpcClient> getRpcClientClass();
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Properties conf = MiniKdc.createConf();
+    conf.put(MiniKdc.DEBUG, true);
+    KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
+    KDC.start();
+    PRINCIPAL = "hbase/" + HOST;
+    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
+    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
+    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (KDC != null) {
+      KDC.stop();
+    }
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUpTest() throws Exception {
+    krbKeytab = getKeytabFileForTesting();
+    krbPrincipal = getPrincipalForTesting();
+    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
+    clientConf = getSecuredConfiguration();
+    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName());
+    serverConf = getSecuredConfiguration();
+  }
+
+  @Test
+  public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
+    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
+
+    // check that the login user is okay:
+    assertSame(ugi, ugi2);
+    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
+    assertEquals(krbPrincipal, ugi.getUserName());
+
+    callRpcService(User.create(ugi2));
+  }
+
+  @Test
+  public void testRpcFallbackToSimpleAuth() throws Exception {
+    String clientUsername = "testuser";
+    UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
+        new String[]{clientUsername});
+
+    // check that the client user is insecure
+    assertNotSame(ugi, clientUgi);
+    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
+    assertEquals(clientUsername, clientUgi.getUserName());
+
+    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
+    callRpcService(User.create(clientUgi));
+  }
+
+  void setRpcProtection(String clientProtection, String serverProtection) {
+    clientConf.set("hbase.rpc.protection", clientProtection);
+    serverConf.set("hbase.rpc.protection", serverProtection);
+  }
+
+  /**
+   * Test various combinations of Server and Client qops.
+   * @throws Exception
+   */
+  @Test
+  public void testSaslWithCommonQop() throws Exception {
+    setRpcProtection("privacy,authentication", "authentication");
+    callRpcService(User.create(ugi));
+
+    setRpcProtection("authentication", "privacy,authentication");
+    callRpcService(User.create(ugi));
+
+    setRpcProtection("integrity,authentication", "privacy,authentication");
+    callRpcService(User.create(ugi));
+  }
+
+  @Test
+  public void testSaslNoCommonQop() throws Exception {
+    exception.expect(SaslException.class);
+    exception.expectMessage("No common protection layer between client and server");
+    setRpcProtection("integrity", "privacy");
+    callRpcService(User.create(ugi));
+  }
+
+  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
+      throws Exception {
+    Configuration cnf = new Configuration();
+    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(cnf);
+    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
+    return UserGroupInformation.getLoginUser();
+  }
+
+  /**
+   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown
+   * from the stub, this function will throw root cause of that exception.
+   */
+  private void callRpcService(User clientUser) throws Exception {
+    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
+    Mockito.when(securityInfoMock.getServerPrincipal())
+        .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    SecurityInfo.addInfo("TestDelayedService", securityInfoMock);
+
+    boolean delayReturnValue = false;
+    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
+    TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
+    BlockingService service =
+        TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
+
+    RpcServerInterface rpcServer =
+        new RpcServer(null, "testSecuredDelayedRpc",
+            Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
+            serverConf, new FifoRpcScheduler(serverConf, 1));
+    rpcServer.start();
+    RpcClient rpcClient =
+        RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString());
+    try {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      BlockingRpcChannel channel =
+          rpcClient.createBlockingRpcChannel(
+            ServerName.valueOf(address.getHostName(), address.getPort(),
+            System.currentTimeMillis()), clientUser, 0);
+      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
+          TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
+      List<Integer> results = new ArrayList<>();
+      TestThread th1 = new TestThread(stub, true, results);
+      final Throwable exception[] = new Throwable[1];
+          Collections.synchronizedList(new ArrayList<Throwable>());
+      Thread.UncaughtExceptionHandler exceptionHandler =
+          new Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread th, Throwable ex) {
+              exception[0] = ex;
+            }
+          };
+      th1.setUncaughtExceptionHandler(exceptionHandler);
+      th1.start();
+      th1.join();
+      if (exception[0] != null) {
+        // throw root cause.
+        while (exception[0].getCause() != null) {
+          exception[0] = exception[0].getCause();
+        }
+        throw (Exception) exception[0];
+      }
+
+      assertEquals(0xDEADBEEF, results.get(0).intValue());
+    } finally {
+      rpcClient.close();
+      rpcServer.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
new file mode 100644
index 0000000..ea37915
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestAsyncSecureIPC extends AbstractTestSecureIPC {
+
+  Class<? extends RpcClient> getRpcClientClass() {
+    return AsyncRpcClient.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
deleted file mode 100644
index 96ed986..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.TextOutputCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.SaslClient;
-
-import org.apache.hadoop.hbase.testclassification.SecurityTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcClient.SaslClientCallbackHandler;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mockito;
-
-import com.google.common.base.Strings;
-
-@Category({SecurityTests.class, SmallTests.class})
-public class TestHBaseSaslRpcClient {
-  
-  static {
-    System.setProperty("java.security.krb5.realm", "DOMAIN.COM");
-    System.setProperty("java.security.krb5.kdc", "DOMAIN.COM");
-  }
-  
-  static final String DEFAULT_USER_NAME = "principal";
-  static final String DEFAULT_USER_PASSWORD = "password";
-
-  private static final Logger LOG = Logger.getLogger(TestHBaseSaslRpcClient.class);
-
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  @BeforeClass
-  public static void before() {
-    Logger.getRootLogger().setLevel(Level.DEBUG);
-  }
-
-  @Test
-  public void testSaslQOPNotEmpty() throws Exception {
-    Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
-        DEFAULT_USER_PASSWORD);
-    // default QOP is authentication
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false);
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        AUTHENTICATION.getSaslQop()));
-
-    // check with specific QOPs
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "authentication");
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        AUTHENTICATION.getSaslQop()));
-
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "privacy");
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        PRIVACY.getSaslQop()));
-
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "integrity");
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        INTEGRITY.getSaslQop()));
-
-    exception.expect(IllegalArgumentException.class);
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "wrongvalue");
-  }
-
-  @Test
-  public void testSaslClientCallbackHandler() throws UnsupportedCallbackException {
-    final Token<? extends TokenIdentifier> token = createTokenMock();
-    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
-    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
-
-    final NameCallback nameCallback = mock(NameCallback.class);
-    final PasswordCallback passwordCallback = mock(PasswordCallback.class);
-    final RealmCallback realmCallback = mock(RealmCallback.class);
-    final RealmChoiceCallback realmChoiceCallback = mock(RealmChoiceCallback.class);
-
-    Callback[] callbackArray = {nameCallback, passwordCallback,
-        realmCallback, realmChoiceCallback};
-    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
-    saslClCallbackHandler.handle(callbackArray);
-    verify(nameCallback).setName(anyString());
-    verify(realmCallback).setText(anyString());
-    verify(passwordCallback).setPassword(any(char[].class));
-  }
-
-  @Test
-  public void testSaslClientCallbackHandlerWithException() {
-    final Token<? extends TokenIdentifier> token = createTokenMock();
-    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
-    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
-    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
-    try {
-      saslClCallbackHandler.handle(new Callback[] { mock(TextOutputCallback.class) });
-    } catch (UnsupportedCallbackException expEx) {
-      //expected
-    } catch (Exception ex) {
-      fail("testSaslClientCallbackHandlerWithException error : " + ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testHBaseSaslRpcClientCreation() throws Exception {
-    //creation kerberos principal check section
-    assertFalse(assertSuccessCreationKerberosPrincipal(null));
-    assertFalse(assertSuccessCreationKerberosPrincipal("DOMAIN.COM"));
-    assertFalse(assertSuccessCreationKerberosPrincipal("principal/DOMAIN.COM"));
-    if (!assertSuccessCreationKerberosPrincipal("principal/localhost@DOMAIN.COM")) {
-      // XXX: This can fail if kerberos support in the OS is not sane, see HBASE-10107.
-      // For now, don't assert, just warn
-      LOG.warn("Could not create a SASL client with valid Kerberos credential");
-    }
-
-    //creation digest principal check section
-    assertFalse(assertSuccessCreationDigestPrincipal(null, null));
-    assertFalse(assertSuccessCreationDigestPrincipal("", ""));
-    assertFalse(assertSuccessCreationDigestPrincipal("", null));
-    assertFalse(assertSuccessCreationDigestPrincipal(null, ""));
-    assertTrue(assertSuccessCreationDigestPrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-
-    //creation simple principal check section
-    assertFalse(assertSuccessCreationSimplePrincipal("", ""));
-    assertFalse(assertSuccessCreationSimplePrincipal(null, null));
-    assertFalse(assertSuccessCreationSimplePrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-
-    //exceptions check section
-    assertTrue(assertIOExceptionThenSaslClientIsNull(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-    assertTrue(assertIOExceptionWhenGetStreamsBeforeConnectCall(
-        DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-  }
-
-  @Test
-  public void testAuthMethodReadWrite() throws IOException {
-    DataInputBuffer in = new DataInputBuffer();
-    DataOutputBuffer out = new DataOutputBuffer();
-
-    assertAuthMethodRead(in, AuthMethod.SIMPLE);
-    assertAuthMethodRead(in, AuthMethod.KERBEROS);
-    assertAuthMethodRead(in, AuthMethod.DIGEST);
-
-    assertAuthMethodWrite(out, AuthMethod.SIMPLE);
-    assertAuthMethodWrite(out, AuthMethod.KERBEROS);
-    assertAuthMethodWrite(out, AuthMethod.DIGEST);
-  }
-
-  private void assertAuthMethodRead(DataInputBuffer in, AuthMethod authMethod)
-      throws IOException {
-    in.reset(new byte[] {authMethod.code}, 1);
-    assertEquals(authMethod, AuthMethod.read(in));
-  }
-
-  private void assertAuthMethodWrite(DataOutputBuffer out, AuthMethod authMethod)
-      throws IOException {
-    authMethod.write(out);
-    assertEquals(authMethod.code, out.getData()[0]);
-    out.reset();
-  }
-
-  private boolean assertIOExceptionWhenGetStreamsBeforeConnectCall(String principal,
-      String password) throws IOException {
-    boolean inState = false;
-    boolean outState = false;
-
-    HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST, 
-        createTokenMockWithCredentials(principal, password), principal, false) {
-      @Override
-      public SaslClient createDigestSaslClient(String[] mechanismNames,
-          String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-              throws IOException {
-        return Mockito.mock(SaslClient.class);
-      }
-
-      @Override
-      public SaslClient createKerberosSaslClient(String[] mechanismNames,
-          String userFirstPart, String userSecondPart) throws IOException {
-        return Mockito.mock(SaslClient.class);
-      }
-    };
-    
-    try {
-      rpcClient.getInputStream(Mockito.mock(InputStream.class));
-    } catch(IOException ex) {
-      //Sasl authentication exchange hasn't completed yet
-      inState = true;
-    }
-
-    try {
-      rpcClient.getOutputStream(Mockito.mock(OutputStream.class));
-    } catch(IOException ex) {
-      //Sasl authentication exchange hasn't completed yet
-      outState = true;
-    }
-
-    return inState && outState;
-  }
-
-  private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) {
-    try {
-      new HBaseSaslRpcClient(AuthMethod.DIGEST, 
-          createTokenMockWithCredentials(principal, password), principal, false) {
-        @Override
-        public SaslClient createDigestSaslClient(String[] mechanismNames,
-            String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-                throws IOException {
-          return null;
-        }
-  
-        @Override
-        public SaslClient createKerberosSaslClient(String[] mechanismNames,
-            String userFirstPart, String userSecondPart) throws IOException {
-          return null;
-        }
-      };
-      return false;
-    } catch (IOException ex) {
-      return true;
-    }
-  }
-
-  private boolean assertSuccessCreationKerberosPrincipal(String principal) {
-    HBaseSaslRpcClient rpcClient = null;
-    try {
-      rpcClient = createSaslRpcClientForKerberos(principal);
-    } catch(Exception ex) {
-      LOG.error(ex.getMessage(), ex);
-    }
-    return rpcClient != null;
-  }
-
-  private boolean assertSuccessCreationDigestPrincipal(String principal, String password) {
-    HBaseSaslRpcClient rpcClient = null;
-    try {
-      rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST, 
-          createTokenMockWithCredentials(principal, password), principal, false);
-    } catch(Exception ex) {
-      LOG.error(ex.getMessage(), ex);
-    }
-    return rpcClient != null;
-  }
-
-  private boolean assertSuccessCreationSimplePrincipal(String principal, String password) {
-    HBaseSaslRpcClient rpcClient = null;
-    try {
-      rpcClient = createSaslRpcClientSimple(principal, password);
-    } catch(Exception ex) {
-      LOG.error(ex.getMessage(), ex);
-    }
-    return rpcClient != null;
-  }
-
-  private HBaseSaslRpcClient createSaslRpcClientForKerberos(String principal)
-      throws IOException {
-    return new HBaseSaslRpcClient(AuthMethod.KERBEROS, createTokenMock(), principal, false);
-  }
-
-  private Token<? extends TokenIdentifier> createTokenMockWithCredentials(
-      String principal, String password)
-      throws IOException {
-    Token<? extends TokenIdentifier> token = createTokenMock();
-    if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(password)) {
-      when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
-      when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
-    }
-    return token;
-  }
-
-  private HBaseSaslRpcClient createSaslRpcClientSimple(String principal, String password)
-      throws IOException {
-    return new HBaseSaslRpcClient(AuthMethod.SIMPLE, createTokenMock(), principal, false);
-  }
-
-  @SuppressWarnings("unchecked")
-  private Token<? extends TokenIdentifier> createTokenMock() {
-    return mock(Token.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
new file mode 100644
index 0000000..98ea221
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientImpl;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestSecureIPC extends AbstractTestSecureIPC {
+
+  Class<? extends RpcClient> getRpcClientClass() {
+    return RpcClientImpl.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
deleted file mode 100644
index 769e014..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcServerInterface;
-import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
-import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
-import org.apache.hadoop.hbase.testclassification.SecurityTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.minikdc.MiniKdc;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-
-@Category({ SecurityTests.class, SmallTests.class })
-public class TestSecureRPC {
-
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
-      .getPath());
-
-  private static MiniKdc KDC;
-
-  private static String HOST = "localhost";
-
-  private static String PRINCIPAL;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    Properties conf = MiniKdc.createConf();
-    conf.put(MiniKdc.DEBUG, true);
-    KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
-    KDC.start();
-    PRINCIPAL = "hbase/" + HOST;
-    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
-    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
-    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (KDC != null) {
-      KDC.stop();
-    }
-    TEST_UTIL.cleanupTestDir();
-  }
-
-  @Test
-  public void testRpc() throws Exception {
-    testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
-  }
-
-  @Test
-  public void testRpcWithInsecureFallback() throws Exception {
-    testRpcFallbackToSimpleAuth(RpcClientImpl.class);
-  }
-
-  @Test
-  public void testAsyncRpc() throws Exception {
-    testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
-  }
-
-  @Test
-  public void testAsyncRpcWithInsecureFallback() throws Exception {
-    testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
-  }
-
-  private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
-      throws Exception {
-    String krbKeytab = getKeytabFileForTesting();
-    String krbPrincipal = getPrincipalForTesting();
-
-    UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
-    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
-
-    // check that the login user is okay:
-    assertSame(ugi, ugi2);
-    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
-    assertEquals(krbPrincipal, ugi.getUserName());
-
-    Configuration clientConf = getSecuredConfiguration();
-    callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
-  }
-
-  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
-      throws Exception {
-    Configuration cnf = new Configuration();
-    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-    UserGroupInformation.setConfiguration(cnf);
-    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
-    return UserGroupInformation.getLoginUser();
-  }
-
-  private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
-                              Configuration clientConf, boolean allowInsecureFallback)
-      throws Exception {
-    Configuration clientConfCopy = new Configuration(clientConf);
-    clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
-
-    Configuration conf = getSecuredConfiguration();
-    conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
-
-    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
-    Mockito.when(securityInfoMock.getServerPrincipal())
-        .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
-    SecurityInfo.addInfo("TestDelayedService", securityInfoMock);
-
-    boolean delayReturnValue = false;
-    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
-    TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
-    BlockingService service =
-        TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-
-    RpcServerInterface rpcServer =
-        new RpcServer(null, "testSecuredDelayedRpc",
-            Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
-            conf, new FifoRpcScheduler(conf, 1));
-    rpcServer.start();
-    RpcClient rpcClient =
-        RpcClientFactory.createClient(clientConfCopy, HConstants.DEFAULT_CLUSTER_ID.toString());
-    try {
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      BlockingRpcChannel channel =
-          rpcClient.createBlockingRpcChannel(
-            ServerName.valueOf(address.getHostName(), address.getPort(),
-            System.currentTimeMillis()), clientUser, 5000);
-      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-          TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-      List<Integer> results = new ArrayList<Integer>();
-      TestThread th1 = new TestThread(stub, true, results);
-      th1.start();
-      th1.join();
-
-      assertEquals(0xDEADBEEF, results.get(0).intValue());
-    } finally {
-      rpcClient.close();
-      rpcServer.stop();
-    }
-  }
-
-  public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
-    String krbKeytab = getKeytabFileForTesting();
-    String krbPrincipal = getPrincipalForTesting();
-
-    UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
-    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
-    assertEquals(krbPrincipal, ugi.getUserName());
-
-    String clientUsername = "testuser";
-    UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
-        new String[]{clientUsername});
-
-    // check that the client user is insecure
-    assertNotSame(ugi, clientUgi);
-    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
-    assertEquals(clientUsername, clientUgi.getUserName());
-
-    Configuration clientConf = new Configuration();
-    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
-    callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
-  }
-}
\ No newline at end of file