You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2018/09/19 02:57:20 UTC

hbase git commit: HBASE-20993. [Auth] IPC client fallback to simple auth allowed doesn't work

Repository: hbase
Updated Branches:
  refs/heads/branch-1 c458757f1 -> 8dbb0b048


HBASE-20993. [Auth] IPC client fallback to simple auth allowed doesn't work

Amending-Author: Reid Chan <re...@apache.org>
Signed-off-by: Reid Chan <re...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8dbb0b04
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8dbb0b04
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8dbb0b04

Branch: refs/heads/branch-1
Commit: 8dbb0b04876d746c20c661220b58b98a1a000a94
Parents: c458757
Author: jackbearden <ja...@jackbearden.com>
Authored: Fri Aug 24 18:57:18 2018 -0700
Committer: Reid Chan <re...@apache.org>
Committed: Wed Sep 19 10:53:24 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/BlockingRpcConnection.java |  22 ++
 .../hbase/ipc/FallbackDisallowedException.java  |   2 +-
 .../hadoop/hbase/ipc/NettyRpcConnection.java    |  34 ++-
 .../hbase/ipc/NettyRpcNegotiateHandler.java     |  93 ++++++++
 .../apache/hadoop/hbase/ipc/RpcConnection.java  |   2 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  25 ++-
 .../ipc/TestRpcServerSlowConnectionSetup.java   |   3 +
 .../hadoop/hbase/security/TestInsecureIPC.java  | 221 +++++++++++++++++++
 .../hadoop/hbase/security/TestSecureIPC.java    |  13 +-
 9 files changed, 390 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index d5cf6a2..89c4999 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -440,6 +442,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
         OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
         // Write out the preamble -- MAGIC, version, and auth to use.
         writeConnectionHeaderPreamble(outStream);
+        readPreambleResponse(inStream);
         if (useSasl) {
           final InputStream in2 = inStream;
           final OutputStream out2 = outStream;
@@ -499,6 +502,25 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
     thread.start();
   }
 
+  private void readPreambleResponse(InputStream inStream) throws IOException {
+    DataInputStream resultCode = new DataInputStream(new BufferedInputStream(inStream));
+    int state = resultCode.readInt();
+    if (state == SaslStatus.SUCCESS.state) {
+      int fallback = resultCode.readInt();
+      if (fallback == SaslStatus.SUCCESS.state) {
+        return;
+      }
+      if (fallback == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
+        if (this.rpcClient.fallbackAllowed) {
+          useSasl = false;
+          return;
+        }
+        throw new FallbackDisallowedException();
+      }
+    }
+    readResponse();
+  }
+
   /**
    * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
index 721148b..77b5429 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
@@ -32,7 +32,7 @@ public class FallbackDisallowedException extends HBaseIOException {
   private static final long serialVersionUID = -6942845066279358253L;
 
   public FallbackDisallowedException() {
-    super("Server asks us to fall back to SIMPLE auth, "
+    super("Server asked us to fall back to SIMPLE auth, "
         + "but this client is configured to only allow secure connections.");
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index b5fb7e4..4bd7d1f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -181,6 +181,33 @@ class NettyRpcConnection extends RpcConnection {
     }
   }
 
+  private void negotiate(final Channel ch) {
+    ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
+
+    Promise<Boolean> preamblePromise = ch.eventLoop().newPromise();
+    ChannelHandler preambleHandler = new NettyRpcNegotiateHandler(this, preamblePromise,
+        rpcClient.fallbackAllowed);
+    ch.pipeline().addFirst(preambleHandler);
+    preamblePromise.addListener(new FutureListener<Boolean>() {
+
+      @Override
+      public void operationComplete(Future<Boolean> future) throws Exception {
+        if (future.isSuccess()) {
+          ch.pipeline().remove(NettyRpcNegotiateHandler.class);
+          if (useSasl) {
+            saslNegotiate(ch);
+          } else {
+            established(ch);
+          }
+        } else {
+          final Throwable error = future.cause();
+          scheduleRelogin(error);
+          failInit(ch, toIOE(error));
+        }
+      }
+    });
+  }
+
   private void saslNegotiate(final Channel ch) {
     UserGroupInformation ticket = getUGI();
     if (ticket == null) {
@@ -236,12 +263,7 @@ class NettyRpcConnection extends RpcConnection {
               rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
               return;
             }
-            ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
-            if (useSasl) {
-              saslNegotiate(ch);
-            } else {
-              established(ch);
-            }
+            negotiate(ch);
           }
         }).channel();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcNegotiateHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcNegotiateHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcNegotiateHandler.java
new file mode 100644
index 0000000..6436ef6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcNegotiateHandler.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.SaslUtil;
+
+/**
+ * Implements a Netty RPC client handler for the preamble response from the RPC server
+ */
+@InterfaceAudience.Private
+public class NettyRpcNegotiateHandler extends ChannelDuplexHandler {
+  private static final Log LOG = LogFactory.getLog(NettyRpcNegotiateHandler.class);
+  private final RpcConnection conn;
+  private final Promise<Boolean> promise;
+  private final boolean fallbackAllowed;
+
+  public NettyRpcNegotiateHandler(RpcConnection conn, Promise<Boolean> promise,
+                                  boolean fallbackAllowed) {
+    this.conn = conn;
+    this.promise = promise;
+    this.fallbackAllowed = fallbackAllowed;
+  }
+
+  private void attemptToFallback() {
+    if (fallbackAllowed) {
+      LOG.info("Server asked us to fall back to SIMPLE auth. Falling back...");
+      conn.useSasl = false;
+    } else {
+      LOG.error("Server asked us to fall back to SIMPLE auth, " +
+          "but we are not configured for that behavior!");
+      handleFailure(new FallbackDisallowedException());
+    }
+  }
+
+  private void handleFailure(Exception e) {
+    promise.tryFailure(e);
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    ByteBuf buf = (ByteBuf) msg;
+    int status;
+    try {
+      status = buf.readInt();
+      if (status == 0) {
+        if (buf.readInt() == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
+          attemptToFallback();
+        }
+        promise.trySuccess(true);
+      }
+      handleFailure(new IOException("Error while establishing connection to server"));
+    } catch (Exception e) {
+      handleFailure(e);
+    } finally {
+      buf.release();
+    }
+  }
+
+  @Override
+  public void channelReadComplete(ChannelHandlerContext ctx) {
+    ctx.flush();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    conn.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 5e9e97e..93881dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -58,7 +58,7 @@ abstract class RpcConnection {
 
   protected final AuthMethod authMethod;
 
-  protected final boolean useSasl;
+  protected boolean useSasl;
 
   protected final Token<? extends TokenIdentifier> token;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 3f11233..8ebb32f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1301,7 +1301,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
     private AuthMethod authMethod;
     private boolean saslContextEstablished;
-    private boolean skipInitialSaslHandshake;
     private ByteBuffer unwrappedData;
     // When is this set?  FindBugs wants to know!  Says NP
     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
@@ -1578,6 +1577,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       preambleBuffer.flip();
       for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
         if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
+          doRawSaslReply(SaslStatus.ERROR, null, null, null);
           return doBadPreambleHandling("Expected HEADER=" +
               Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
               Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
@@ -1588,18 +1588,23 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
       this.authMethod = AuthMethod.valueOf(authbyte);
       if (version != CURRENT_VERSION) {
+        doRawSaslReply(SaslStatus.ERROR, null, null, null);
         String msg = getFatalConnectionString(version, authbyte);
         return doBadPreambleHandling(msg, new WrongVersionException(msg));
       }
       if (authMethod == null) {
+        doRawSaslReply(SaslStatus.ERROR, null, null, null);
         String msg = getFatalConnectionString(version, authbyte);
         return doBadPreambleHandling(msg, new BadAuthException(msg));
       }
       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+        // server side uses non-simple auth, client side uses simple auth.
         if (allowFallbackToSimpleAuth) {
+          doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
           metrics.authenticationFallback();
           authenticatedWithFallback = true;
         } else {
+          doRawSaslReply(SaslStatus.ERROR, null, null, null);
           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
           responder.doRespond(authFailedCall);
@@ -1607,16 +1612,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
       }
       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+        // server side uses simple auth, client side uses non-simple auth.
         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
         authMethod = AuthMethod.SIMPLE;
-        // client has already sent the initial Sasl message and we
-        // should ignore it. Both client and server should fall back
-        // to simple auth from now on.
-        skipInitialSaslHandshake = true;
-      }
-      if (authMethod != AuthMethod.SIMPLE) {
+      } else if (authMethod != AuthMethod.SIMPLE) {
+        // both server and client side use non-simple auth.
         useSasl = true;
+        doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
+      } else if (!isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+        // both server and client side use simple auth.
+        doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null);
       }
 
       preambleBuffer = null; // do not need it anymore
@@ -1768,11 +1774,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     private void process() throws IOException, InterruptedException {
       data.flip();
       try {
-        if (skipInitialSaslHandshake) {
-          skipInitialSaslHandshake = false;
-          return;
-        }
-
         if (useSasl) {
           saslReadAndProcess(data);
         } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
index b024d76..1b239f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
@@ -89,6 +89,9 @@ public class TestRpcServerSlowConnectionSetup {
     Thread.sleep(5000);
     socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
     socket.getOutputStream().flush();
+    DataInputStream pis = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
+    int responseCode = pis.readInt();
+    assertEquals(responseCode, 0);
 
     ConnectionHeader header = ConnectionHeader.newBuilder()
         .setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName())

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java
new file mode 100644
index 0000000..01ba235
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.ServiceException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category({SecurityTests.class, SmallTests.class})
+public class TestInsecureIPC {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final File KEYTAB_FILE =
+      new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+
+  private static MiniKdc KDC;
+  private static String HOST = "localhost";
+  private static String PRINCIPAL;
+
+  String krbKeytab;
+  String krbPrincipal;
+
+  Configuration clientConf;
+  Configuration serverConf;
+  UserGroupInformation ugi;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Parameterized.Parameters(name = "{index}: rpcClientImpl={0}")
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()},
+        new Object[]{NettyRpcClient.class.getName()});
+  }
+
+  @Parameterized.Parameter
+  public String rpcClientImpl;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
+    PRINCIPAL = "hbase/" + HOST;
+    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
+    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (KDC != null) {
+      KDC.stop();
+    }
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUpTest() throws Exception {
+    krbKeytab = getKeytabFileForTesting();
+    krbPrincipal = getPrincipalForTesting();
+    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
+    clientConf = getSecuredConfiguration();
+    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
+    serverConf = HBaseConfiguration.create();
+  }
+
+  @Test
+  public void testRpcInsecureClientAgainstInsecureServer() throws Exception {
+    String clientUsername = "testuser";
+    UserGroupInformation clientUgi =
+        UserGroupInformation.createUserForTesting(clientUsername, new String[]{clientUsername});
+
+    assertNotSame(ugi, clientUgi);
+    assertEquals(UserGroupInformation.AuthenticationMethod.SIMPLE,
+        clientUgi.getAuthenticationMethod());
+    assertEquals(clientUsername, clientUgi.getUserName());
+
+    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    callInsecureRpcService(User.create(clientUgi));
+  }
+
+  @Test
+  public void testRpcFallbackToSimpleFromKerberosClientAgainstInsecureServer() throws Exception {
+    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
+
+    assertSame(ugi, ugi2);
+    assertEquals(UserGroupInformation.AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
+    assertEquals(krbPrincipal, ugi.getUserName());
+
+    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
+    callInsecureRpcService(User.create(ugi2));
+  }
+
+  private void callInsecureRpcService(User clientUser) throws Exception {
+    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
+
+    RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestInsecureIPC",
+        Lists.newArrayList(
+            new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE,
+                null)),
+        isa, serverConf, new FifoRpcScheduler(serverConf, 1));
+    rpcServer.start();
+    try (RpcClient rpcClient =
+             RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+          newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
+      TestInsecureIPC.TestThread th1 = new TestInsecureIPC.TestThread(stub);
+      final Throwable[] exception = new Throwable[1];
+      Collections.synchronizedList(new ArrayList<Throwable>());
+      Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
+        public void uncaughtException(Thread th, Throwable ex) {
+          exception[0] = ex;
+        }
+      };
+      th1.setUncaughtExceptionHandler(exceptionHandler);
+      th1.start();
+      th1.join();
+      if (exception[0] != null) {
+        // throw root cause.
+        while (exception[0].getCause() != null) {
+          exception[0] = exception[0].getCause();
+        }
+        throw (Exception) exception[0];
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
+      throws Exception {
+    Configuration cnf = new Configuration();
+    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(cnf);
+    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
+    return UserGroupInformation.getLoginUser();
+  }
+
+  public static class TestThread extends Thread {
+    private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
+
+    public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) {
+      this.stub = stub;
+    }
+
+    @Override
+    public void run() {
+      try {
+        int[] messageSize = new int[]{100, 1000, 10000};
+        for (int i = 0; i < messageSize.length; i++) {
+          String input = RandomStringUtils.random(messageSize[i]);
+          String result =
+              stub.echo(null,
+                  TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()).getMessage();
+          assertEquals(input, result);
+        }
+      } catch (ServiceException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8dbb0b04/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
index b016bd3..9d74ec2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -176,13 +176,16 @@ public class TestSecureIPC {
     setRpcProtection("authentication", "privacy,authentication");
     callRpcService(User.create(ugi));
 
-    setRpcProtection("integrity,authentication", "privacy,authentication");
+    setRpcProtection("integrity,authentication",
+        "privacy,authentication");
     callRpcService(User.create(ugi));
 
-    setRpcProtection("integrity,authentication", "integrity,authentication");
+    setRpcProtection("integrity,authentication",
+        "integrity,authentication");
     callRpcService(User.create(ugi));
 
-    setRpcProtection("privacy,authentication", "privacy,authentication");
+    setRpcProtection("privacy,authentication",
+        "privacy,authentication");
     callRpcService(User.create(ugi));
   }
 
@@ -262,8 +265,8 @@ public class TestSecureIPC {
         for (int i = 0; i < messageSize.length; i++) {
           String input = RandomStringUtils.random(messageSize[i]);
           String result =
-              stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
-                  .getMessage();
+              stub.echo(null,
+                  TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()).getMessage();
           assertEquals(input, result);
         }
       } catch (ServiceException e) {