You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/08/06 23:10:04 UTC

[hbase] branch master updated: HBASE-26666 Add native TLS encryption support to RPC server/client (#4666)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new f8dcf070553 HBASE-26666 Add native TLS encryption support to RPC server/client (#4666)
f8dcf070553 is described below

commit f8dcf0705538ffeddcaa5eff74c8c2f61f7a844a
Author: Andor Molnár <an...@cloudera.com>
AuthorDate: Sun Aug 7 01:09:54 2022 +0200

    HBASE-26666 Add native TLS encryption support to RPC server/client (#4666)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
 .../apache/hadoop/hbase/ipc/NettyRpcClient.java    |  18 +
 .../hadoop/hbase/ipc/NettyRpcConnection.java       |  52 ++-
 hbase-common/pom.xml                               |  10 +
 .../hbase/exceptions/KeyManagerException.java      |  33 ++
 .../hbase/exceptions/SSLContextException.java      |  37 ++
 .../hbase/exceptions/TrustManagerException.java    |  33 ++
 .../hadoop/hbase/exceptions/X509Exception.java     |  43 +++
 .../hbase/io/crypto/tls/KeyStoreFileType.java      | 122 ++++++
 .../hadoop/hbase/io/crypto/tls/X509Util.java       | 339 +++++++++++++++++
 .../crypto/tls/BaseX509ParameterizedTestCase.java  | 104 +++++
 .../hadoop/hbase/io/crypto/tls/TestX509Util.java   | 389 +++++++++++++++++++
 .../hadoop/hbase/io/crypto/tls/X509KeyType.java    |  32 ++
 .../hbase/io/crypto/tls/X509TestContext.java       | 422 +++++++++++++++++++++
 .../hbase/io/crypto/tls/X509TestHelpers.java       | 419 ++++++++++++++++++++
 hbase-server/pom.xml                               |  10 +
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    |  24 ++
 .../hadoop/hbase/ipc/TestNettyIPCSslFailure.java   |  90 +++++
 .../apache/hadoop/hbase/security/TestTlsIPC.java   | 209 ++++++++++
 .../hadoop/hbase/security/TestTlsWithKerberos.java | 219 +++++++++++
 pom.xml                                            |   6 +
 20 files changed, 2601 insertions(+), 10 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index 7b698958ede..b5f0a5e76bf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -19,10 +19,14 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.SSLException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -30,6 +34,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
 
 /**
@@ -44,6 +49,7 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
   final Class<? extends Channel> channelClass;
 
   private final boolean shutdownGroupWhenClose;
+  private final AtomicReference<SslContext> sslContextForClient = new AtomicReference<>();
 
   public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
     MetricsConnection metrics) {
@@ -81,4 +87,16 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
       group.shutdownGracefully();
     }
   }
+
+  SslContext getSslContext() throws X509Exception, SSLException {
+    SslContext result = sslContextForClient.get();
+    if (result == null) {
+      result = X509Util.createSslContextForClient(conf);
+      if (!sslContextForClient.compareAndSet(null, result)) {
+        // lost the race, another thread already set the value
+        result = sslContextForClient.get();
+      }
+    }
+    return result;
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index f761ec0b2dc..cd12c7d2073 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
 import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
@@ -56,6 +57,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
 import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
@@ -278,24 +281,27 @@ class NettyRpcConnection extends RpcConnection {
       .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
       .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
       .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
-      .handler(new ChannelInitializer<Channel>() {
-
+      .handler(new ChannelInitializer() {
         @Override
         protected void initChannel(Channel ch) throws Exception {
+          if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) {
+            SslContext sslContext = rpcClient.getSslContext();
+            SslHandler sslHandler = sslContext.newHandler(ch.alloc(),
+              remoteId.address.getHostName(), remoteId.address.getPort());
+            sslHandler.setHandshakeTimeoutMillis(
+              conf.getInt(X509Util.HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT,
+                X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS));
+            ch.pipeline().addFirst(sslHandler);
+            LOG.info("SSL handler added with handshake timeout {} ms",
+              sslHandler.getHandshakeTimeoutMillis());
+          }
           ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
             new BufferCallBeforeInitHandler());
         }
       }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
       .addListener(new ChannelFutureListener() {
 
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          Channel ch = future.channel();
-          if (!future.isSuccess()) {
-            failInit(ch, toIOE(future.cause()));
-            rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause());
-            return;
-          }
+        private void succeed(Channel ch) throws IOException {
           ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
           if (useSasl) {
             saslNegotiate(ch);
@@ -305,6 +311,32 @@ class NettyRpcConnection extends RpcConnection {
             established(ch);
           }
         }
+
+        private void fail(Channel ch, Throwable error) {
+          failInit(ch, toIOE(error));
+          rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), error);
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          Channel ch = future.channel();
+          if (!future.isSuccess()) {
+            fail(ch, future.cause());
+            return;
+          }
+          SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
+          if (sslHandler != null) {
+            NettyFutureUtils.addListener(sslHandler.handshakeFuture(), f -> {
+              if (f.isSuccess()) {
+                succeed(ch);
+              } else {
+                fail(ch, f.cause());
+              }
+            });
+          } else {
+            succeed(ch);
+          }
+        }
       }).channel();
   }
 
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index 007e75b19d0..4d551979d6d 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -152,6 +152,16 @@
       <artifactId>kerb-simplekdc</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcprov-jdk15on</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcpkix-jdk15on</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/KeyManagerException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/KeyManagerException.java
new file mode 100644
index 00000000000..ae337d05fe3
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/KeyManagerException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class KeyManagerException extends X509Exception {
+
+  public KeyManagerException(String message) {
+    super(message);
+  }
+
+  public KeyManagerException(Throwable cause) {
+    super(cause);
+  }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/SSLContextException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/SSLContextException.java
new file mode 100644
index 00000000000..0999a6f9bd5
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/SSLContextException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class SSLContextException extends X509Exception {
+
+  public SSLContextException(String message) {
+    super(message);
+  }
+
+  public SSLContextException(Throwable cause) {
+    super(cause);
+  }
+
+  public SSLContextException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/TrustManagerException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/TrustManagerException.java
new file mode 100644
index 00000000000..131ad7d2cd3
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/TrustManagerException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class TrustManagerException extends X509Exception {
+
+  public TrustManagerException(String message) {
+    super(message);
+  }
+
+  public TrustManagerException(Throwable cause) {
+    super(cause);
+  }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/X509Exception.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/X509Exception.java
new file mode 100644
index 00000000000..10bed79351d
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/exceptions/X509Exception.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Exception.java">Base
+ *      revision</a>
+ */
+@InterfaceAudience.Private
+public class X509Exception extends HBaseException {
+
+  public X509Exception(String message) {
+    super(message);
+  }
+
+  public X509Exception(Throwable cause) {
+    super(cause);
+  }
+
+  public X509Exception(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/KeyStoreFileType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/KeyStoreFileType.java
new file mode 100644
index 00000000000..851b6812423
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/KeyStoreFileType.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This enum represents the file type of a KeyStore or TrustStore. Currently, JKS (Java keystore),
+ * PEM, PKCS12, and BCFKS types are supported.
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/main/java/org/apache/zookeeper/common/KeyStoreFileType.java">Base
+ *      revision</a>
+ */
+@InterfaceAudience.Private
+public enum KeyStoreFileType {
+  JKS(".jks"),
+  PEM(".pem"),
+  PKCS12(".p12"),
+  BCFKS(".bcfks");
+
+  private final String defaultFileExtension;
+
+  KeyStoreFileType(String defaultFileExtension) {
+    this.defaultFileExtension = defaultFileExtension;
+  }
+
+  /**
+   * The property string that specifies that a key store or trust store should use this store file
+   * type.
+   */
+  public String getPropertyValue() {
+    return this.name();
+  }
+
+  /**
+   * The file extension that is associated with this file type.
+   */
+  public String getDefaultFileExtension() {
+    return defaultFileExtension;
+  }
+
+  /**
+   * Converts a property value to a StoreFileType enum. If the property value is <code>null</code>
+   * or an empty string, returns <code>null</code>.
+   * @param propertyValue the property value.
+   * @return the KeyStoreFileType, or <code>null</code> if <code>propertyValue</code> is
+   *         <code>null</code> or empty.
+   * @throws IllegalArgumentException if <code>propertyValue</code> is not one of "JKS", "PEM",
+   *                                  "BCFKS", "PKCS12", or empty/null.
+   */
+  public static KeyStoreFileType fromPropertyValue(String propertyValue) {
+    if (propertyValue == null || propertyValue.length() == 0) {
+      return null;
+    }
+    return KeyStoreFileType.valueOf(propertyValue.toUpperCase());
+  }
+
+  /**
+   * Detects the type of KeyStore / TrustStore file from the file extension. If the file name ends
+   * with ".jks", returns <code>StoreFileType.JKS</code>. If the file name ends with ".pem", returns
+   * <code>StoreFileType.PEM</code>. If the file name ends with ".p12", returns
+   * <code>StoreFileType.PKCS12</code>. If the file name ends with ".bckfs", returns
+   * <code>StoreFileType.BCKFS</code>. Otherwise, throws an IllegalArgumentException.
+   * @param filename the filename of the key store or trust store file.
+   * @return a KeyStoreFileType.
+   * @throws IllegalArgumentException if the filename does not end with ".jks", ".pem", "p12" or
+   *                                  "bcfks".
+   */
+  public static KeyStoreFileType fromFilename(String filename) {
+    int i = filename.lastIndexOf('.');
+    if (i >= 0) {
+      String extension = filename.substring(i);
+      for (KeyStoreFileType storeFileType : KeyStoreFileType.values()) {
+        if (storeFileType.getDefaultFileExtension().equals(extension)) {
+          return storeFileType;
+        }
+      }
+    }
+    throw new IllegalArgumentException(
+      "Unable to auto-detect store file type from file name: " + filename);
+  }
+
+  /**
+   * If <code>propertyValue</code> is not null or empty, returns the result of
+   * <code>KeyStoreFileType.fromPropertyValue(propertyValue)</code>. Else, returns the result of
+   * <code>KeyStoreFileType.fromFileName(filename)</code>.
+   * @param propertyValue property value describing the KeyStoreFileType, or null/empty to
+   *                      auto-detect the type from the file name.
+   * @param filename      file name of the key store file. The file extension is used to auto-detect
+   *                      the KeyStoreFileType when <code>propertyValue</code> is null or empty.
+   * @return a KeyStoreFileType.
+   * @throws IllegalArgumentException if <code>propertyValue</code> is not one of "JKS", "PEM",
+   *                                  "PKCS12", "BCFKS", or empty/null.
+   * @throws IllegalArgumentException if <code>propertyValue</code>is empty or null and the type
+   *                                  could not be determined from the file name.
+   */
+  public static KeyStoreFileType fromPropertyValueOrFileName(String propertyValue,
+    String filename) {
+    KeyStoreFileType result = KeyStoreFileType.fromPropertyValue(propertyValue);
+    if (result == null) {
+      result = KeyStoreFileType.fromFilename(filename);
+    }
+    return result;
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
new file mode 100644
index 00000000000..78ce833448e
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.Security;
+import java.security.cert.PKIXBuilderParameters;
+import java.security.cert.X509CertSelector;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.net.ssl.CertPathTrustManagerParameters;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedTrustManager;
+import javax.net.ssl.X509KeyManager;
+import javax.net.ssl.X509TrustManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.exceptions.KeyManagerException;
+import org.apache.hadoop.hbase.exceptions.SSLContextException;
+import org.apache.hadoop.hbase.exceptions.TrustManagerException;
+import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ObjectArrays;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+
+/**
+ * Utility code for X509 handling Default cipher suites: Performance testing done by Facebook
+ * engineers shows that on Intel x86_64 machines, Java9 performs better with GCM and Java8 performs
+ * better with CBC, so these seem like reasonable defaults.
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java">Base
+ *      revision</a>
+ */
+@InterfaceAudience.Private
+public final class X509Util {
+
+  private static final Logger LOG = LoggerFactory.getLogger(X509Util.class);
+
+  // Config
+  static final String CONFIG_PREFIX = "hbase.rpc.tls.";
+  public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol";
+  public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX + "keystore.location";
+  static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + "keystore.type";
+  static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + "keystore.password";
+  static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + "truststore.location";
+  static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + "truststore.type";
+  static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + "truststore.password";
+  public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr";
+  public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp";
+  private static final String TLS_ENABLED_PROTOCOLS = CONFIG_PREFIX + "enabledProtocols";
+  private static final String TLS_CIPHER_SUITES = CONFIG_PREFIX + "ciphersuites";
+
+  public static final String HBASE_CLIENT_NETTY_TLS_ENABLED = "hbase.client.netty.tls.enabled";
+  public static final String HBASE_SERVER_NETTY_TLS_ENABLED = "hbase.server.netty.tls.enabled";
+
+  public static final String HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT =
+    "hbase.server.netty.tls.supportplaintext";
+
+  public static final String HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT =
+    "hbase.client.netty.tls.handshaketimeout";
+  public static final int DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS = 5000;
+
+  public static final String DEFAULT_PROTOCOL = "TLSv1.2";
+
+  private static String[] getGCMCiphers() {
+    return new String[] { "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
+      "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
+      "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" };
+  }
+
+  private static String[] getCBCCiphers() {
+    return new String[] { "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
+      "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
+      "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384",
+      "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
+      "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA" };
+  }
+
+  // On Java 8, prefer CBC ciphers since AES-NI support is lacking and GCM is slower than CBC.
+  private static final String[] DEFAULT_CIPHERS_JAVA8 =
+    ObjectArrays.concat(getCBCCiphers(), getGCMCiphers(), String.class);
+  // On Java 9 and later, prefer GCM ciphers due to improved AES-NI support.
+  // Note that this performance assumption might not hold true for architectures other than x86_64.
+  private static final String[] DEFAULT_CIPHERS_JAVA9 =
+    ObjectArrays.concat(getGCMCiphers(), getCBCCiphers(), String.class);
+
+  private X509Util() {
+    // disabled
+  }
+
+  static String[] getDefaultCipherSuites() {
+    return getDefaultCipherSuitesForJavaVersion(System.getProperty("java.specification.version"));
+  }
+
+  static String[] getDefaultCipherSuitesForJavaVersion(String javaVersion) {
+    Objects.requireNonNull(javaVersion);
+    if (javaVersion.matches("\\d+")) {
+      // Must be Java 9 or later
+      LOG.debug("Using Java9+ optimized cipher suites for Java version {}", javaVersion);
+      return DEFAULT_CIPHERS_JAVA9;
+    } else if (javaVersion.startsWith("1.")) {
+      // Must be Java 1.8 or earlier
+      LOG.debug("Using Java8 optimized cipher suites for Java version {}", javaVersion);
+      return DEFAULT_CIPHERS_JAVA8;
+    } else {
+      LOG.debug("Could not parse java version {}, using Java8 optimized cipher suites",
+        javaVersion);
+      return DEFAULT_CIPHERS_JAVA8;
+    }
+  }
+
+  public static SslContext createSslContextForClient(Configuration config)
+    throws X509Exception, SSLException {
+
+    SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+
+    String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
+    String keyStorePassword = config.get(TLS_CONFIG_KEYSTORE_PASSWORD, "");
+    String keyStoreType = config.get(TLS_CONFIG_KEYSTORE_TYPE, "");
+
+    if (keyStoreLocation.isEmpty()) {
+      LOG.warn(TLS_CONFIG_KEYSTORE_LOCATION + " not specified");
+    } else {
+      sslContextBuilder
+        .keyManager(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType));
+    }
+
+    String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
+    String trustStorePassword = config.get(TLS_CONFIG_TRUSTSTORE_PASSWORD, "");
+    String trustStoreType = config.get(TLS_CONFIG_TRUSTSTORE_TYPE, "");
+
+    boolean sslCrlEnabled = config.getBoolean(TLS_CONFIG_CLR, false);
+    boolean sslOcspEnabled = config.getBoolean(TLS_CONFIG_OCSP, false);
+
+    if (trustStoreLocation.isEmpty()) {
+      LOG.warn(TLS_CONFIG_TRUSTSTORE_LOCATION + " not specified");
+    } else {
+      sslContextBuilder.trustManager(createTrustManager(trustStoreLocation, trustStorePassword,
+        trustStoreType, sslCrlEnabled, sslOcspEnabled));
+    }
+
+    sslContextBuilder.enableOcsp(sslOcspEnabled);
+    sslContextBuilder.protocols(getEnabledProtocols(config));
+    sslContextBuilder.ciphers(Arrays.asList(getCipherSuites(config)));
+
+    return sslContextBuilder.build();
+  }
+
+  public static SslContext createSslContextForServer(Configuration config)
+    throws X509Exception, SSLException {
+    String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
+    String keyStorePassword = config.get(TLS_CONFIG_KEYSTORE_PASSWORD, "");
+    String keyStoreType = config.get(TLS_CONFIG_KEYSTORE_TYPE, "");
+
+    if (keyStoreLocation.isEmpty()) {
+      throw new SSLContextException(
+        "Keystore is required for SSL server: " + TLS_CONFIG_KEYSTORE_LOCATION);
+    }
+
+    SslContextBuilder sslContextBuilder;
+
+    sslContextBuilder = SslContextBuilder
+      .forServer(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType));
+
+    String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
+    String trustStorePassword = config.get(TLS_CONFIG_TRUSTSTORE_PASSWORD, "");
+    String trustStoreType = config.get(TLS_CONFIG_TRUSTSTORE_TYPE, "");
+
+    boolean sslCrlEnabled = config.getBoolean(TLS_CONFIG_CLR, false);
+    boolean sslOcspEnabled = config.getBoolean(TLS_CONFIG_OCSP, false);
+
+    if (trustStoreLocation.isEmpty()) {
+      LOG.warn(TLS_CONFIG_TRUSTSTORE_LOCATION + " not specified");
+    } else {
+      sslContextBuilder.trustManager(createTrustManager(trustStoreLocation, trustStorePassword,
+        trustStoreType, sslCrlEnabled, sslOcspEnabled));
+    }
+
+    sslContextBuilder.enableOcsp(sslOcspEnabled);
+    sslContextBuilder.protocols(getEnabledProtocols(config));
+    sslContextBuilder.ciphers(Arrays.asList(getCipherSuites(config)));
+
+    return sslContextBuilder.build();
+  }
+
+  /**
+   * Creates a key manager by loading the key store from the given file of the given type,
+   * optionally decrypting it using the given password.
+   * @param keyStoreLocation the location of the key store file.
+   * @param keyStorePassword optional password to decrypt the key store. If empty, assumes the key
+   *                         store is not encrypted.
+   * @param keyStoreType     must be JKS, PEM, PKCS12, BCFKS or null. If null, attempts to
+   *                         autodetect the key store type from the file extension (e.g. .jks /
+   *                         .pem).
+   * @return the key manager.
+   * @throws KeyManagerException if something goes wrong.
+   */
+  static X509KeyManager createKeyManager(String keyStoreLocation, String keyStorePassword,
+    String keyStoreType) throws KeyManagerException {
+
+    if (keyStorePassword == null) {
+      keyStorePassword = "";
+    }
+
+    if (keyStoreType == null) {
+      keyStoreType = "jks";
+    }
+
+    try {
+      char[] password = keyStorePassword.toCharArray();
+      KeyStore ks = KeyStore.getInstance(keyStoreType);
+      try (InputStream inputStream =
+        new BufferedInputStream(Files.newInputStream(new File(keyStoreLocation).toPath()))) {
+        ks.load(inputStream, password);
+      }
+
+      KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");
+      kmf.init(ks, password);
+
+      for (KeyManager km : kmf.getKeyManagers()) {
+        if (km instanceof X509KeyManager) {
+          return (X509KeyManager) km;
+        }
+      }
+      throw new KeyManagerException("Couldn't find X509KeyManager");
+    } catch (IOException | GeneralSecurityException | IllegalArgumentException e) {
+      throw new KeyManagerException(e);
+    }
+  }
+
+  /**
+   * Creates a trust manager by loading the trust store from the given file of the given type,
+   * optionally decrypting it using the given password.
+   * @param trustStoreLocation the location of the trust store file.
+   * @param trustStorePassword optional password to decrypt the trust store (only applies to JKS
+   *                           trust stores). If empty, assumes the trust store is not encrypted.
+   * @param trustStoreType     must be JKS, PEM, PKCS12, BCFKS or null. If null, attempts to
+   *                           autodetect the trust store type from the file extension (e.g. .jks /
+   *                           .pem).
+   * @param crlEnabled         enable CRL (certificate revocation list) checks.
+   * @param ocspEnabled        enable OCSP (online certificate status protocol) checks.
+   * @return the trust manager.
+   * @throws TrustManagerException if something goes wrong.
+   */
+  static X509TrustManager createTrustManager(String trustStoreLocation, String trustStorePassword,
+    String trustStoreType, boolean crlEnabled, boolean ocspEnabled) throws TrustManagerException {
+
+    if (trustStorePassword == null) {
+      trustStorePassword = "";
+    }
+
+    if (trustStoreType == null) {
+      trustStoreType = "jks";
+    }
+
+    try {
+      char[] password = trustStorePassword.toCharArray();
+      KeyStore ts = KeyStore.getInstance(trustStoreType);
+      try (InputStream inputStream =
+        new BufferedInputStream(Files.newInputStream(new File(trustStoreLocation).toPath()))) {
+        ts.load(inputStream, password);
+      }
+
+      PKIXBuilderParameters pbParams = new PKIXBuilderParameters(ts, new X509CertSelector());
+      if (crlEnabled || ocspEnabled) {
+        pbParams.setRevocationEnabled(true);
+        System.setProperty("com.sun.net.ssl.checkRevocation", "true");
+        if (crlEnabled) {
+          System.setProperty("com.sun.security.enableCRLDP", "true");
+        }
+        if (ocspEnabled) {
+          Security.setProperty("ocsp.enable", "true");
+        }
+      } else {
+        pbParams.setRevocationEnabled(false);
+      }
+
+      // Revocation checking is only supported with the PKIX algorithm
+      TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+      tmf.init(new CertPathTrustManagerParameters(pbParams));
+
+      for (final TrustManager tm : tmf.getTrustManagers()) {
+        if (tm instanceof X509ExtendedTrustManager) {
+          return (X509ExtendedTrustManager) tm;
+        }
+      }
+      throw new TrustManagerException("Couldn't find X509TrustManager");
+    } catch (IOException | GeneralSecurityException | IllegalArgumentException e) {
+      throw new TrustManagerException(e);
+    }
+  }
+
+  private static String[] getEnabledProtocols(Configuration config) {
+    String enabledProtocolsInput = config.get(TLS_ENABLED_PROTOCOLS);
+    if (enabledProtocolsInput == null) {
+      return new String[] { config.get(TLS_CONFIG_PROTOCOL, DEFAULT_PROTOCOL) };
+    }
+    return enabledProtocolsInput.split(",");
+  }
+
+  private static String[] getCipherSuites(Configuration config) {
+    String cipherSuitesInput = config.get(TLS_CIPHER_SUITES);
+    if (cipherSuitesInput == null) {
+      return getDefaultCipherSuites();
+    } else {
+      return cipherSuitesInput.split(",");
+    }
+  }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/BaseX509ParameterizedTestCase.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/BaseX509ParameterizedTestCase.java
new file mode 100644
index 00000000000..f2001e5173c
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/BaseX509ParameterizedTestCase.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.security.Security;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.commons.io.FileUtils;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Base class for parameterized unit tests that use X509TestContext for testing different X509
+ * parameter combinations (CA key type, cert key type, with/without a password, with/without
+ * hostname verification, etc). This base class takes care of setting up / cleaning up the test
+ * environment, and caching the X509TestContext objects used by the tests.
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/test/java/org/apache/zookeeper/common/BaseX509ParameterizedTestCase.java">Base
+ *      revision</a>
+ */
+public abstract class BaseX509ParameterizedTestCase {
+  protected static final String KEY_NON_EMPTY_PASSWORD = "pa$$w0rd";
+  protected static final String KEY_EMPTY_PASSWORD = "";
+
+  /**
+   * Because key generation and writing / deleting files is kind of expensive, we cache the certs
+   * and on-disk files between test cases. None of the test cases modify any of this data so it's
+   * safe to reuse between tests. This caching makes all test cases after the first one for a given
+   * parameter combination complete almost instantly.
+   */
+  protected static Map<Integer, X509TestContext> cachedTestContexts;
+  protected static File tempDir;
+
+  protected X509TestContext x509TestContext;
+
+  @BeforeClass
+  public static void setUpBaseClass() throws Exception {
+    Security.addProvider(new BouncyCastleProvider());
+    cachedTestContexts = new HashMap<>();
+    tempDir = Files.createTempDirectory("x509Tests").toFile();
+  }
+
+  @AfterClass
+  public static void cleanUpBaseClass() {
+    Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+    cachedTestContexts.clear();
+    cachedTestContexts = null;
+    try {
+      FileUtils.deleteDirectory(tempDir);
+    } catch (IOException e) {
+      // ignore
+    }
+  }
+
+  /**
+   * Init method. See example usage in {@link TestX509Util}.
+   * @param paramIndex      the index under which the X509TestContext should be cached.
+   * @param contextSupplier a function that creates and returns the X509TestContext for the current
+   *                        index if one is not already cached.
+   */
+  protected void init(Integer paramIndex, Supplier<X509TestContext> contextSupplier) {
+    if (cachedTestContexts.containsKey(paramIndex)) {
+      x509TestContext = cachedTestContexts.get(paramIndex);
+    } else {
+      x509TestContext = contextSupplier.get();
+      cachedTestContexts.put(paramIndex, x509TestContext);
+    }
+  }
+
+  protected void init(final X509KeyType caKeyType, final X509KeyType certKeyType,
+    final String keyPassword, final Integer paramIndex) throws Exception {
+    init(paramIndex, () -> {
+      try {
+        return X509TestContext.newBuilder().setTempDir(tempDir).setKeyStorePassword(keyPassword)
+          .setKeyStoreKeyType(certKeyType).setTrustStorePassword(keyPassword)
+          .setTrustStoreKeyType(caKeyType).build();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java
new file mode 100644
index 00000000000..8e3425d2853
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/TestX509Util.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.exceptions.KeyManagerException;
+import org.apache.hadoop.hbase.exceptions.SSLContextException;
+import org.apache.hadoop.hbase.exceptions.TrustManagerException;
+import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
+
+/**
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509UtilTest.java">Base
+ *      revision</a>
+ */
+@RunWith(Parameterized.class)
+@Category({ MiscTests.class, SmallTests.class })
+public class TestX509Util extends BaseX509ParameterizedTestCase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestX509Util.class);
+
+  @Parameterized.Parameter()
+  public X509KeyType caKeyType;
+
+  @Parameterized.Parameter(value = 1)
+  public X509KeyType certKeyType;
+
+  @Parameterized.Parameter(value = 2)
+  public String keyPassword;
+
+  @Parameterized.Parameter(value = 3)
+  public Integer paramIndex;
+
+  @Parameterized.Parameters(
+      name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}")
+  public static Collection<Object[]> data() {
+    List<Object[]> params = new ArrayList<>();
+    int paramIndex = 0;
+    for (X509KeyType caKeyType : X509KeyType.values()) {
+      for (X509KeyType certKeyType : X509KeyType.values()) {
+        for (String keyPassword : new String[] { KEY_EMPTY_PASSWORD, KEY_NON_EMPTY_PASSWORD }) {
+          params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ });
+        }
+      }
+    }
+    return params;
+  }
+
+  private Configuration hbaseConf;
+
+  @Override
+  public void init(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword,
+    Integer paramIndex) throws Exception {
+    super.init(caKeyType, certKeyType, keyPassword, paramIndex);
+    x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+    hbaseConf = x509TestContext.getHbaseConf();
+  }
+
+  @After
+  public void cleanUp() {
+    x509TestContext.clearSystemProperties();
+    x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_OCSP);
+    x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_CLR);
+    x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
+    System.clearProperty("com.sun.net.ssl.checkRevocation");
+    System.clearProperty("com.sun.security.enableCRLDP");
+    Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
+    Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
+  }
+
+  @Test
+  public void testCreateSSLContextWithoutCustomProtocol() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    SslContext sslContext = X509Util.createSslContextForClient(hbaseConf);
+    ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+    assertEquals(new String[] { X509Util.DEFAULT_PROTOCOL },
+      sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols());
+  }
+
+  @Test
+  public void testCreateSSLContextWithCustomProtocol() throws Exception {
+    final String protocol = "TLSv1.1";
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    hbaseConf.set(X509Util.TLS_CONFIG_PROTOCOL, protocol);
+    ByteBufAllocator byteBufAllocatorMock = mock(ByteBufAllocator.class);
+    SslContext sslContext = X509Util.createSslContextForServer(hbaseConf);
+    assertEquals(Collections.singletonList(protocol),
+      Arrays.asList(sslContext.newEngine(byteBufAllocatorMock).getEnabledProtocols()));
+  }
+
+  @Test(expected = SSLContextException.class)
+  public void testCreateSSLContextWithoutKeyStoreLocationServer() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
+    X509Util.createSslContextForServer(hbaseConf);
+  }
+
+  @Test
+  public void testCreateSSLContextWithoutKeyStoreLocationClient() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
+    X509Util.createSslContextForClient(hbaseConf);
+  }
+
+  @Test(expected = X509Exception.class)
+  public void testCreateSSLContextWithoutKeyStorePassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    if (!x509TestContext.isKeyStoreEncrypted()) {
+      throw new SSLContextException("");
+    }
+    hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
+    X509Util.createSslContextForServer(hbaseConf);
+  }
+
+  @Test
+  public void testCreateSSLContextWithoutTrustStoreLocationClient() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
+    X509Util.createSslContextForClient(hbaseConf);
+  }
+
+  @Test
+  public void testCreateSSLContextWithoutTrustStoreLocationServer() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
+    X509Util.createSslContextForServer(hbaseConf);
+  }
+
+  // It would be great to test the value of PKIXBuilderParameters#setRevocationEnabled,
+  // but it does not appear to be possible
+  @Test
+  public void testCRLEnabled() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    hbaseConf.setBoolean(X509Util.TLS_CONFIG_CLR, true);
+    X509Util.createSslContextForServer(hbaseConf);
+    assertTrue(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
+    assertTrue(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
+    assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
+  }
+
+  @Test
+  public void testCRLDisabled() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    X509Util.createSslContextForServer(hbaseConf);
+    assertFalse(Boolean.valueOf(System.getProperty("com.sun.net.ssl.checkRevocation")));
+    assertFalse(Boolean.valueOf(System.getProperty("com.sun.security.enableCRLDP")));
+    assertFalse(Boolean.valueOf(Security.getProperty("ocsp.enable")));
+  }
+
+  @Test
+  public void testLoadJKSKeyStore() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    // Make sure we can instantiate a key manager from the JKS file on disk
+    X509Util.createKeyManager(
+      x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+      x509TestContext.getKeyStorePassword(), KeyStoreFileType.JKS.getPropertyValue());
+  }
+
+  @Test
+  public void testLoadJKSKeyStoreNullPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    if (!x509TestContext.getKeyStorePassword().isEmpty()) {
+      return;
+    }
+    // Make sure that empty password and null password are treated the same
+    X509Util.createKeyManager(
+      x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null,
+      KeyStoreFileType.JKS.getPropertyValue());
+  }
+
+  @Test
+  public void testLoadJKSKeyStoreFileTypeDefaultToJks() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    // Make sure we can instantiate a key manager from the JKS file on disk
+    X509Util.createKeyManager(
+      x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+      x509TestContext.getKeyStorePassword(),
+      null /* null StoreFileType means 'autodetect from file extension' */);
+  }
+
+  @Test
+  public void testLoadJKSKeyStoreWithWrongPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    assertThrows(KeyManagerException.class, () -> {
+      // Attempting to load with the wrong key password should fail
+      X509Util.createKeyManager(
+        x509TestContext.getKeyStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), "wrong password",
+        KeyStoreFileType.JKS.getPropertyValue());
+    });
+  }
+
+  @Test
+  public void testLoadJKSTrustStore() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    // Make sure we can instantiate a trust manager from the JKS file on disk
+    X509Util.createTrustManager(
+      x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+      x509TestContext.getTrustStorePassword(), KeyStoreFileType.JKS.getPropertyValue(), true, true);
+  }
+
+  @Test
+  public void testLoadJKSTrustStoreNullPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    if (!x509TestContext.getTrustStorePassword().isEmpty()) {
+      return;
+    }
+    // Make sure that empty password and null password are treated the same
+    X509Util.createTrustManager(
+      x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), null,
+      KeyStoreFileType.JKS.getPropertyValue(), false, false);
+  }
+
+  @Test
+  public void testLoadJKSTrustStoreFileTypeDefaultToJks() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    // Make sure we can instantiate a trust manager from the JKS file on disk
+    X509Util.createTrustManager(
+      x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(),
+      x509TestContext.getTrustStorePassword(), null, // null StoreFileType means 'autodetect from
+                                                     // file extension'
+      true, true);
+  }
+
+  @Test
+  public void testLoadJKSTrustStoreWithWrongPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    assertThrows(TrustManagerException.class, () -> {
+      // Attempting to load with the wrong key password should fail
+      X509Util.createTrustManager(
+        x509TestContext.getTrustStoreFile(KeyStoreFileType.JKS).getAbsolutePath(), "wrong password",
+        KeyStoreFileType.JKS.getPropertyValue(), true, true);
+    });
+  }
+
+  @Test
+  public void testLoadPKCS12KeyStore() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    // Make sure we can instantiate a key manager from the PKCS12 file on disk
+    X509Util.createKeyManager(
+      x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+      x509TestContext.getKeyStorePassword(), KeyStoreFileType.PKCS12.getPropertyValue());
+  }
+
+  @Test
+  public void testLoadPKCS12KeyStoreNullPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    if (!x509TestContext.getKeyStorePassword().isEmpty()) {
+      return;
+    }
+    // Make sure that empty password and null password are treated the same
+    X509Util.createKeyManager(
+      x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(), null,
+      KeyStoreFileType.PKCS12.getPropertyValue());
+  }
+
+  @Test
+  public void testLoadPKCS12KeyStoreWithWrongPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    assertThrows(KeyManagerException.class, () -> {
+      // Attempting to load with the wrong key password should fail
+      X509Util.createKeyManager(
+        x509TestContext.getKeyStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+        "wrong password", KeyStoreFileType.PKCS12.getPropertyValue());
+    });
+  }
+
+  @Test
+  public void testLoadPKCS12TrustStore() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    // Make sure we can instantiate a trust manager from the PKCS12 file on disk
+    X509Util.createTrustManager(
+      x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+      x509TestContext.getTrustStorePassword(), KeyStoreFileType.PKCS12.getPropertyValue(), true,
+      true);
+  }
+
+  @Test
+  public void testLoadPKCS12TrustStoreNullPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    if (!x509TestContext.getTrustStorePassword().isEmpty()) {
+      return;
+    }
+    // Make sure that empty password and null password are treated the same
+    X509Util.createTrustManager(
+      x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(), null,
+      KeyStoreFileType.PKCS12.getPropertyValue(), false, false);
+  }
+
+  @Test
+  public void testLoadPKCS12TrustStoreWithWrongPassword() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    assertThrows(TrustManagerException.class, () -> {
+      // Attempting to load with the wrong key password should fail
+      X509Util.createTrustManager(
+        x509TestContext.getTrustStoreFile(KeyStoreFileType.PKCS12).getAbsolutePath(),
+        "wrong password", KeyStoreFileType.PKCS12.getPropertyValue(), true, true);
+    });
+  }
+
+  @Test
+  public void testGetDefaultCipherSuitesJava8() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("1.8");
+    // Java 8 default should have the CBC suites first
+    assertTrue(cipherSuites[0].contains("CBC"));
+  }
+
+  @Test
+  public void testGetDefaultCipherSuitesJava9() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("9");
+    // Java 9+ default should have the GCM suites first
+    assertTrue(cipherSuites[0].contains("GCM"));
+  }
+
+  @Test
+  public void testGetDefaultCipherSuitesJava10() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("10");
+    // Java 9+ default should have the GCM suites first
+    assertTrue(cipherSuites[0].contains("GCM"));
+  }
+
+  @Test
+  public void testGetDefaultCipherSuitesJava11() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("11");
+    // Java 9+ default should have the GCM suites first
+    assertTrue(cipherSuites[0].contains("GCM"));
+  }
+
+  @Test
+  public void testGetDefaultCipherSuitesUnknownVersion() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    String[] cipherSuites = X509Util.getDefaultCipherSuitesForJavaVersion("notaversion");
+    // If version can't be parsed, use the more conservative Java 8 default
+    assertTrue(cipherSuites[0].contains("CBC"));
+  }
+
+  @Test
+  public void testGetDefaultCipherSuitesNullVersion() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    assertThrows(NullPointerException.class, () -> {
+      X509Util.getDefaultCipherSuitesForJavaVersion(null);
+    });
+  }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509KeyType.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509KeyType.java
new file mode 100644
index 00000000000..1d5c042f04a
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509KeyType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+/**
+ * Represents a type of key pair used for X509 certs in tests. The two options are RSA or EC
+ * (elliptic curve).
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509KeyType.java">Base
+ *      revision</a>
+ */
+public enum X509KeyType {
+  RSA,
+  EC
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
new file mode 100644
index 00000000000..3eee7e64f76
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.security.KeyPair;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.bouncycastle.asn1.x500.X500NameBuilder;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.OperatorCreationException;
+
+/**
+ * This class simplifies the creation of certificates and private keys for SSL/TLS connections.
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509TestContext.java">Base
+ *      revision</a>
+ */
+@InterfaceAudience.Private
+public final class X509TestContext {
+
+  private static final String TRUST_STORE_PREFIX = "hbase_test_ca";
+  private static final String KEY_STORE_PREFIX = "hbase_test_key";
+
+  private final File tempDir;
+  private final Configuration hbaseConf = HBaseConfiguration.create();
+
+  private final X509Certificate trustStoreCertificate;
+  private final String trustStorePassword;
+  private File trustStoreJksFile;
+  private File trustStorePemFile;
+  private File trustStorePkcs12File;
+
+  private final KeyPair keyStoreKeyPair;
+  private final X509Certificate keyStoreCertificate;
+  private final String keyStorePassword;
+  private File keyStoreJksFile;
+  private File keyStorePemFile;
+  private File keyStorePkcs12File;
+
+  /**
+   * Constructor is intentionally private, use the Builder class instead.
+   * @param tempDir            the directory in which key store and trust store temp files will be
+   *                           written.
+   * @param trustStoreKeyPair  the key pair for the trust store.
+   * @param trustStorePassword the password to protect a JKS trust store (ignored for PEM trust
+   *                           stores).
+   * @param keyStoreKeyPair    the key pair for the key store.
+   * @param keyStorePassword   the password to protect the key store private key.
+   */
+  private X509TestContext(File tempDir, KeyPair trustStoreKeyPair, String trustStorePassword,
+    KeyPair keyStoreKeyPair, String keyStorePassword)
+    throws IOException, GeneralSecurityException, OperatorCreationException {
+    if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) {
+      throw new IllegalStateException("BC Security provider was not found");
+    }
+    this.tempDir = requireNonNull(tempDir);
+    if (!tempDir.isDirectory()) {
+      throw new IllegalArgumentException("Not a directory: " + tempDir);
+    }
+    this.trustStorePassword = requireNonNull(trustStorePassword);
+    this.keyStoreKeyPair = requireNonNull(keyStoreKeyPair);
+    this.keyStorePassword = requireNonNull(keyStorePassword);
+
+    X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+    caNameBuilder.addRDN(BCStyle.CN,
+      MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA");
+    trustStoreCertificate =
+      X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair);
+
+    X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+    nameBuilder.addRDN(BCStyle.CN,
+      MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test");
+    keyStoreCertificate = X509TestHelpers.newCert(trustStoreCertificate, trustStoreKeyPair,
+      nameBuilder.build(), keyStoreKeyPair.getPublic());
+    trustStorePkcs12File = null;
+    trustStorePemFile = null;
+    trustStoreJksFile = null;
+    keyStorePkcs12File = null;
+    keyStorePemFile = null;
+    keyStoreJksFile = null;
+  }
+
+  public File getTempDir() {
+    return tempDir;
+  }
+
+  public String getTrustStorePassword() {
+    return trustStorePassword;
+  }
+
+  /**
+   * Returns the path to the trust store file in the given format (JKS or PEM). Note that the file
+   * is created lazily, the first time this method is called. The trust store file is temporary and
+   * will be deleted on exit.
+   * @param storeFileType the store file type (JKS or PEM).
+   * @return the path to the trust store file.
+   * @throws IOException if there is an error creating the trust store file.
+   */
+  public File getTrustStoreFile(KeyStoreFileType storeFileType) throws IOException {
+    switch (storeFileType) {
+      case JKS:
+        return getTrustStoreJksFile();
+      case PEM:
+        return getTrustStorePemFile();
+      case PKCS12:
+        return getTrustStorePkcs12File();
+      default:
+        throw new IllegalArgumentException("Invalid trust store type: " + storeFileType
+          + ", must be one of: " + Arrays.toString(KeyStoreFileType.values()));
+    }
+  }
+
+  private File getTrustStoreJksFile() throws IOException {
+    if (trustStoreJksFile == null) {
+      File trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX,
+        KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
+      trustStoreJksFile.deleteOnExit();
+      try (
+        final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) {
+        byte[] bytes =
+          X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword);
+        trustStoreOutputStream.write(bytes);
+        trustStoreOutputStream.flush();
+      } catch (GeneralSecurityException e) {
+        throw new IOException(e);
+      }
+      this.trustStoreJksFile = trustStoreJksFile;
+    }
+    return trustStoreJksFile;
+  }
+
+  private File getTrustStorePemFile() throws IOException {
+    if (trustStorePemFile == null) {
+      File trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX,
+        KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
+      trustStorePemFile.deleteOnExit();
+      FileUtils.writeStringToFile(trustStorePemFile,
+        X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII,
+        false);
+      this.trustStorePemFile = trustStorePemFile;
+    }
+    return trustStorePemFile;
+  }
+
+  private File getTrustStorePkcs12File() throws IOException {
+    if (trustStorePkcs12File == null) {
+      File trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX,
+        KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
+      trustStorePkcs12File.deleteOnExit();
+      try (final FileOutputStream trustStoreOutputStream =
+        new FileOutputStream(trustStorePkcs12File)) {
+        byte[] bytes =
+          X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword);
+        trustStoreOutputStream.write(bytes);
+        trustStoreOutputStream.flush();
+      } catch (GeneralSecurityException e) {
+        throw new IOException(e);
+      }
+      this.trustStorePkcs12File = trustStorePkcs12File;
+    }
+    return trustStorePkcs12File;
+  }
+
+  public X509Certificate getKeyStoreCertificate() {
+    return keyStoreCertificate;
+  }
+
+  public String getKeyStorePassword() {
+    return keyStorePassword;
+  }
+
+  public boolean isKeyStoreEncrypted() {
+    return keyStorePassword.length() > 0;
+  }
+
+  public Configuration getHbaseConf() {
+    return hbaseConf;
+  }
+
+  /**
+   * Returns the path to the key store file in the given format (JKS, PEM, ...). Note that the file
+   * is created lazily, the first time this method is called. The key store file is temporary and
+   * will be deleted on exit.
+   * @param storeFileType the store file type (JKS, PEM, ...).
+   * @return the path to the key store file.
+   * @throws IOException if there is an error creating the key store file.
+   */
+  public File getKeyStoreFile(KeyStoreFileType storeFileType) throws IOException {
+    switch (storeFileType) {
+      case JKS:
+        return getKeyStoreJksFile();
+      case PEM:
+        return getKeyStorePemFile();
+      case PKCS12:
+        return getKeyStorePkcs12File();
+      default:
+        throw new IllegalArgumentException("Invalid key store type: " + storeFileType
+          + ", must be one of: " + Arrays.toString(KeyStoreFileType.values()));
+    }
+  }
+
+  private File getKeyStoreJksFile() throws IOException {
+    if (keyStoreJksFile == null) {
+      File keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX,
+        KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
+      keyStoreJksFile.deleteOnExit();
+      try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) {
+        byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate,
+          keyStoreKeyPair.getPrivate(), keyStorePassword);
+        keyStoreOutputStream.write(bytes);
+        keyStoreOutputStream.flush();
+      } catch (GeneralSecurityException e) {
+        throw new IOException(e);
+      }
+      this.keyStoreJksFile = keyStoreJksFile;
+    }
+    return keyStoreJksFile;
+  }
+
+  private File getKeyStorePemFile() throws IOException {
+    if (keyStorePemFile == null) {
+      try {
+        File keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX,
+          KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
+        keyStorePemFile.deleteOnExit();
+        FileUtils.writeStringToFile(keyStorePemFile,
+          X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate,
+            keyStoreKeyPair.getPrivate(), keyStorePassword),
+          StandardCharsets.US_ASCII, false);
+        this.keyStorePemFile = keyStorePemFile;
+      } catch (OperatorCreationException e) {
+        throw new IOException(e);
+      }
+    }
+    return keyStorePemFile;
+  }
+
+  private File getKeyStorePkcs12File() throws IOException {
+    if (keyStorePkcs12File == null) {
+      File keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX,
+        KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
+      keyStorePkcs12File.deleteOnExit();
+      try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) {
+        byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate,
+          keyStoreKeyPair.getPrivate(), keyStorePassword);
+        keyStoreOutputStream.write(bytes);
+        keyStoreOutputStream.flush();
+      } catch (GeneralSecurityException e) {
+        throw new IOException(e);
+      }
+      this.keyStorePkcs12File = keyStorePkcs12File;
+    }
+    return keyStorePkcs12File;
+  }
+
+  /**
+   * Sets the SSL system properties such that the given X509Util object can be used to create SSL
+   * Contexts that will use the trust store and key store files created by this test context.
+   * Example usage:
+   *
+   * <pre>
+   *     X509TestContext testContext = ...; // create the test context
+   *     X509Util x509Util = new QuorumX509Util();
+   *     testContext.setSystemProperties(x509Util, KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+   *     // The returned context will use the key store and trust store created by the test context.
+   *     SSLContext ctx = x509Util.getDefaultSSLContext();
+   * </pre>
+   *
+   * @param keyStoreFileType   the store file type to use for the key store (JKS, PEM, ...).
+   * @param trustStoreFileType the store file type to use for the trust store (JKS, PEM, ...).
+   * @throws IOException if there is an error creating the key store file or trust store file.
+   */
+  public void setSystemProperties(KeyStoreFileType keyStoreFileType,
+    KeyStoreFileType trustStoreFileType) throws IOException {
+    hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION,
+      this.getKeyStoreFile(keyStoreFileType).getAbsolutePath());
+    hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD, this.getKeyStorePassword());
+    hbaseConf.set(X509Util.TLS_CONFIG_KEYSTORE_TYPE, keyStoreFileType.getPropertyValue());
+    hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION,
+      this.getTrustStoreFile(trustStoreFileType).getAbsolutePath());
+    hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD, this.getTrustStorePassword());
+    hbaseConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE, trustStoreFileType.getPropertyValue());
+  }
+
+  public void clearSystemProperties() {
+    hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
+    hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_PASSWORD);
+    hbaseConf.unset(X509Util.TLS_CONFIG_KEYSTORE_TYPE);
+    hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION);
+    hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_PASSWORD);
+    hbaseConf.unset(X509Util.TLS_CONFIG_TRUSTSTORE_TYPE);
+  }
+
+  /**
+   * Builder class, used for creating new instances of X509TestContext.
+   */
+  public static class Builder {
+
+    private File tempDir;
+    private X509KeyType trustStoreKeyType;
+    private String trustStorePassword;
+    private X509KeyType keyStoreKeyType;
+    private String keyStorePassword;
+
+    /**
+     * Creates an empty builder.
+     */
+    public Builder() {
+      trustStoreKeyType = X509KeyType.EC;
+      trustStorePassword = "";
+      keyStoreKeyType = X509KeyType.EC;
+      keyStorePassword = "";
+    }
+
+    /**
+     * Builds a new X509TestContext from this builder.
+     * @return a new X509TestContext
+     */
+    public X509TestContext build()
+      throws IOException, GeneralSecurityException, OperatorCreationException {
+      KeyPair trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType);
+      KeyPair keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType);
+      return new X509TestContext(tempDir, trustStoreKeyPair, trustStorePassword, keyStoreKeyPair,
+        keyStorePassword);
+    }
+
+    /**
+     * Sets the temporary directory. Certificate and private key files will be created in this
+     * directory.
+     * @param tempDir the temp directory.
+     * @return this Builder.
+     */
+    public Builder setTempDir(File tempDir) {
+      this.tempDir = tempDir;
+      return this;
+    }
+
+    /**
+     * Sets the trust store key type. The CA key generated for the test context will be of this
+     * type.
+     * @param keyType the key type.
+     * @return this Builder.
+     */
+    public Builder setTrustStoreKeyType(X509KeyType keyType) {
+      trustStoreKeyType = keyType;
+      return this;
+    }
+
+    /**
+     * Sets the trust store password. Ignored for PEM trust stores, JKS trust stores will be
+     * encrypted with this password.
+     * @param password the password.
+     * @return this Builder.
+     */
+    public Builder setTrustStorePassword(String password) {
+      trustStorePassword = password;
+      return this;
+    }
+
+    /**
+     * Sets the key store key type. The private key generated for the test context will be of this
+     * type.
+     * @param keyType the key type.
+     * @return this Builder.
+     */
+    public Builder setKeyStoreKeyType(X509KeyType keyType) {
+      keyStoreKeyType = keyType;
+      return this;
+    }
+
+    /**
+     * Sets the key store password. The private key (PEM, JKS) and certificate (JKS only) will be
+     * encrypted with this password.
+     * @param password the password.
+     * @return this Builder.
+     */
+    public Builder setKeyStorePassword(String password) {
+      keyStorePassword = password;
+      return this;
+    }
+  }
+
+  /**
+   * Returns a new default-constructed Builder.
+   * @return a new Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java
new file mode 100644
index 00000000000..d4489c1d589
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestHelpers.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.crypto.tls;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.GeneralSecurityException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.SecureRandom;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.security.spec.ECGenParameterSpec;
+import java.security.spec.RSAKeyGenParameterSpec;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.bouncycastle.asn1.DERIA5String;
+import org.bouncycastle.asn1.DEROctetString;
+import org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.BasicConstraints;
+import org.bouncycastle.asn1.x509.ExtendedKeyUsage;
+import org.bouncycastle.asn1.x509.Extension;
+import org.bouncycastle.asn1.x509.GeneralName;
+import org.bouncycastle.asn1.x509.GeneralNames;
+import org.bouncycastle.asn1.x509.KeyPurposeId;
+import org.bouncycastle.asn1.x509.KeyUsage;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
+import org.bouncycastle.crypto.util.PrivateKeyFactory;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.jcajce.JcaPEMWriter;
+import org.bouncycastle.openssl.jcajce.JcaPKCS8Generator;
+import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8EncryptorBuilder;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.bouncycastle.operator.OutputEncryptor;
+import org.bouncycastle.operator.bc.BcContentSignerBuilder;
+import org.bouncycastle.operator.bc.BcECContentSignerBuilder;
+import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
+
+/**
+ * This class contains helper methods for creating X509 certificates and key pairs, and for
+ * serializing them to JKS, PEM or other keystore type files.
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/c74658d398cdc1d207aa296cb6e20de00faec03e/zookeeper-server/src/test/java/org/apache/zookeeper/common/X509TestHelpers.java">Base
+ *      revision</a>
+ */
+@InterfaceAudience.Private
+final class X509TestHelpers {
+
+  private static final SecureRandom PRNG = new SecureRandom();
+  private static final int DEFAULT_RSA_KEY_SIZE_BITS = 2048;
+  private static final BigInteger DEFAULT_RSA_PUB_EXPONENT = RSAKeyGenParameterSpec.F4; // 65537
+  private static final String DEFAULT_ELLIPTIC_CURVE_NAME = "secp256r1";
+  // Per RFC 5280 section 4.1.2.2, X509 certificates can use up to 20 bytes == 160 bits for serial
+  // numbers.
+  private static final int SERIAL_NUMBER_MAX_BITS = 20 * Byte.SIZE;
+
+  /**
+   * Uses the private key of the given key pair to create a self-signed CA certificate with the
+   * public half of the key pair and the given subject and expiration. The issuer of the new cert
+   * will be equal to the subject. Returns the new certificate. The returned certificate should be
+   * used as the trust store. The private key of the input key pair should be used to sign
+   * certificates that are used by test peers to establish TLS connections to each other.
+   * @param subject the subject of the new certificate being created.
+   * @param keyPair the key pair to use. The public key will be embedded in the new certificate, and
+   *                the private key will be used to self-sign the certificate.
+   * @return a new self-signed CA certificate.
+   */
+  public static X509Certificate newSelfSignedCACert(X500Name subject, KeyPair keyPair)
+    throws IOException, OperatorCreationException, GeneralSecurityException {
+    LocalDate now = LocalDate.now(ZoneId.systemDefault());
+    X509v3CertificateBuilder builder = initCertBuilder(subject, // for self-signed certs,
+      // issuer == subject
+      now, now.plusDays(1), subject, keyPair.getPublic());
+    builder.addExtension(Extension.basicConstraints, true, new BasicConstraints(true)); // is a CA
+    builder.addExtension(Extension.keyUsage, true,
+      new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyCertSign | KeyUsage.cRLSign));
+    return buildAndSignCertificate(keyPair.getPrivate(), builder);
+  }
+
+  /**
+   * Using the private key of the given CA key pair and the Subject of the given CA cert as the
+   * Issuer, issues a new cert with the given subject and public key. The returned certificate,
+   * combined with the private key half of the <code>certPublicKey</code>, should be used as the key
+   * store.
+   * @param caCert        the certificate of the CA that's doing the signing.
+   * @param caKeyPair     the key pair of the CA. The private key will be used to sign. The public
+   *                      key must match the public key in the <code>caCert</code>.
+   * @param certSubject   the subject field of the new cert being issued.
+   * @param certPublicKey the public key of the new cert being issued.
+   * @return a new certificate signed by the CA's private key.
+   */
+  public static X509Certificate newCert(X509Certificate caCert, KeyPair caKeyPair,
+    X500Name certSubject, PublicKey certPublicKey)
+    throws IOException, OperatorCreationException, GeneralSecurityException {
+    if (!caKeyPair.getPublic().equals(caCert.getPublicKey())) {
+      throw new IllegalArgumentException(
+        "CA private key does not match the public key in " + "the CA cert");
+    }
+    LocalDate now = LocalDate.now(ZoneId.systemDefault());
+    X509v3CertificateBuilder builder = initCertBuilder(new X500Name(caCert.getIssuerDN().getName()),
+      now, now.plusDays(1), certSubject, certPublicKey);
+    builder.addExtension(Extension.basicConstraints, true, new BasicConstraints(false)); // not a CA
+    builder.addExtension(Extension.keyUsage, true,
+      new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyEncipherment));
+    builder.addExtension(Extension.extendedKeyUsage, true, new ExtendedKeyUsage(
+      new KeyPurposeId[] { KeyPurposeId.id_kp_serverAuth, KeyPurposeId.id_kp_clientAuth }));
+
+    builder.addExtension(Extension.subjectAlternativeName, false, getLocalhostSubjectAltNames());
+    return buildAndSignCertificate(caKeyPair.getPrivate(), builder);
+  }
+
+  /**
+   * Returns subject alternative names for "localhost".
+   * @return the subject alternative names for "localhost".
+   */
+  private static GeneralNames getLocalhostSubjectAltNames() throws UnknownHostException {
+    InetAddress[] localAddresses = InetAddress.getAllByName("localhost");
+    GeneralName[] generalNames = new GeneralName[localAddresses.length + 1];
+    for (int i = 0; i < localAddresses.length; i++) {
+      generalNames[i] =
+        new GeneralName(GeneralName.iPAddress, new DEROctetString(localAddresses[i].getAddress()));
+    }
+    generalNames[generalNames.length - 1] =
+      new GeneralName(GeneralName.dNSName, new DERIA5String("localhost"));
+    return new GeneralNames(generalNames);
+  }
+
+  /**
+   * Helper method for newSelfSignedCACert() and newCert(). Initializes a X509v3CertificateBuilder
+   * with logic that's common to both methods.
+   * @param issuer           Issuer field of the new cert.
+   * @param notBefore        date before which the new cert is not valid.
+   * @param notAfter         date after which the new cert is not valid.
+   * @param subject          Subject field of the new cert.
+   * @param subjectPublicKey public key to store in the new cert.
+   * @return a X509v3CertificateBuilder that can be further customized to finish creating the new
+   *         cert.
+   */
+  private static X509v3CertificateBuilder initCertBuilder(X500Name issuer, LocalDate notBefore,
+    LocalDate notAfter, X500Name subject, PublicKey subjectPublicKey) {
+    return new X509v3CertificateBuilder(issuer, new BigInteger(SERIAL_NUMBER_MAX_BITS, PRNG),
+      java.sql.Date.valueOf(notBefore), java.sql.Date.valueOf(notAfter), subject,
+      SubjectPublicKeyInfo.getInstance(subjectPublicKey.getEncoded()));
+  }
+
+  /**
+   * Signs the certificate being built by the given builder using the given private key and returns
+   * the certificate.
+   * @param privateKey the private key to sign the certificate with.
+   * @param builder    the cert builder that contains the certificate data.
+   * @return the signed certificate.
+   */
+  private static X509Certificate buildAndSignCertificate(PrivateKey privateKey,
+    X509v3CertificateBuilder builder)
+    throws IOException, OperatorCreationException, CertificateException {
+    BcContentSignerBuilder signerBuilder;
+    if (privateKey.getAlgorithm().contains("RSA")) { // a little hacky way to detect key type, but
+      // it works
+      AlgorithmIdentifier signatureAlgorithm =
+        new DefaultSignatureAlgorithmIdentifierFinder().find("SHA256WithRSAEncryption");
+      AlgorithmIdentifier digestAlgorithm =
+        new DefaultDigestAlgorithmIdentifierFinder().find(signatureAlgorithm);
+      signerBuilder = new BcRSAContentSignerBuilder(signatureAlgorithm, digestAlgorithm);
+    } else { // if not RSA, assume EC
+      AlgorithmIdentifier signatureAlgorithm =
+        new DefaultSignatureAlgorithmIdentifierFinder().find("SHA256withECDSA");
+      AlgorithmIdentifier digestAlgorithm =
+        new DefaultDigestAlgorithmIdentifierFinder().find(signatureAlgorithm);
+      signerBuilder = new BcECContentSignerBuilder(signatureAlgorithm, digestAlgorithm);
+    }
+    AsymmetricKeyParameter privateKeyParam = PrivateKeyFactory.createKey(privateKey.getEncoded());
+    ContentSigner signer = signerBuilder.build(privateKeyParam);
+    return toX509Cert(builder.build(signer));
+  }
+
+  /**
+   * Generates a new asymmetric key pair of the given type.
+   * @param keyType the type of key pair to generate.
+   * @return the new key pair.
+   * @throws GeneralSecurityException if your java crypto providers are messed up.
+   */
+  public static KeyPair generateKeyPair(X509KeyType keyType) throws GeneralSecurityException {
+    switch (keyType) {
+      case RSA:
+        return generateRSAKeyPair();
+      case EC:
+        return generateECKeyPair();
+      default:
+        throw new IllegalArgumentException("Invalid X509KeyType");
+    }
+  }
+
+  /**
+   * Generates an RSA key pair with a 2048-bit private key and F4 (65537) as the public exponent.
+   * @return the key pair.
+   */
+  public static KeyPair generateRSAKeyPair() throws GeneralSecurityException {
+    KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
+    RSAKeyGenParameterSpec keyGenSpec =
+      new RSAKeyGenParameterSpec(DEFAULT_RSA_KEY_SIZE_BITS, DEFAULT_RSA_PUB_EXPONENT);
+    keyGen.initialize(keyGenSpec, PRNG);
+    return keyGen.generateKeyPair();
+  }
+
+  /**
+   * Generates an elliptic curve key pair using the "secp256r1" aka "prime256v1" aka "NIST P-256"
+   * curve.
+   * @return the key pair.
+   */
+  public static KeyPair generateECKeyPair() throws GeneralSecurityException {
+    KeyPairGenerator keyGen = KeyPairGenerator.getInstance("EC");
+    keyGen.initialize(new ECGenParameterSpec(DEFAULT_ELLIPTIC_CURVE_NAME), PRNG);
+    return keyGen.generateKeyPair();
+  }
+
+  /**
+   * PEM-encodes the given X509 certificate and private key (compatible with OpenSSL), optionally
+   * protecting the private key with a password. Concatenates them both and returns the result as a
+   * single string. This creates the PEM encoding of a key store.
+   * @param cert        the X509 certificate to PEM-encode.
+   * @param privateKey  the private key to PEM-encode.
+   * @param keyPassword an optional key password. If empty or null, the private key will not be
+   *                    encrypted.
+   * @return a String containing the PEM encodings of the certificate and private key.
+   * @throws IOException               if converting the certificate or private key to PEM format
+   *                                   fails.
+   * @throws OperatorCreationException if constructing the encryptor from the given password fails.
+   */
+  public static String pemEncodeCertAndPrivateKey(X509Certificate cert, PrivateKey privateKey,
+    String keyPassword) throws IOException, OperatorCreationException {
+    return pemEncodeX509Certificate(cert) + "\n" + pemEncodePrivateKey(privateKey, keyPassword);
+  }
+
+  /**
+   * PEM-encodes the given private key (compatible with OpenSSL), optionally protecting it with a
+   * password, and returns the result as a String.
+   * @param key      the private key.
+   * @param password an optional key password. If empty or null, the private key will not be
+   *                 encrypted.
+   * @return a String containing the PEM encoding of the private key.
+   * @throws IOException               if converting the key to PEM format fails.
+   * @throws OperatorCreationException if constructing the encryptor from the given password fails.
+   */
+  public static String pemEncodePrivateKey(PrivateKey key, String password)
+    throws IOException, OperatorCreationException {
+    StringWriter stringWriter = new StringWriter();
+    JcaPEMWriter pemWriter = new JcaPEMWriter(stringWriter);
+    OutputEncryptor encryptor = null;
+    if (password != null && password.length() > 0) {
+      encryptor =
+        new JceOpenSSLPKCS8EncryptorBuilder(PKCSObjectIdentifiers.pbeWithSHAAnd3_KeyTripleDES_CBC)
+          .setProvider(BouncyCastleProvider.PROVIDER_NAME).setRandom(PRNG)
+          .setPasssword(password.toCharArray()).build();
+    }
+    pemWriter.writeObject(new JcaPKCS8Generator(key, encryptor));
+    pemWriter.close();
+    return stringWriter.toString();
+  }
+
+  /**
+   * PEM-encodes the given X509 certificate (compatible with OpenSSL) and returns the result as a
+   * String.
+   * @param cert the certificate.
+   * @return a String containing the PEM encoding of the certificate.
+   * @throws IOException if converting the certificate to PEM format fails.
+   */
+  public static String pemEncodeX509Certificate(X509Certificate cert) throws IOException {
+    StringWriter stringWriter = new StringWriter();
+    JcaPEMWriter pemWriter = new JcaPEMWriter(stringWriter);
+    pemWriter.writeObject(cert);
+    pemWriter.close();
+    return stringWriter.toString();
+  }
+
+  /**
+   * Encodes the given X509Certificate as a JKS TrustStore, optionally protecting the cert with a
+   * password (though it's unclear why one would do this since certificates only contain public
+   * information and do not need to be kept secret). Returns the byte array encoding of the trust
+   * store, which may be written to a file and loaded to instantiate the trust store at a later
+   * point or in another process.
+   * @param cert        the certificate to serialize.
+   * @param keyPassword an optional password to encrypt the trust store. If empty or null, the cert
+   *                    will not be encrypted.
+   * @return the serialized bytes of the JKS trust store.
+   */
+  public static byte[] certToJavaTrustStoreBytes(X509Certificate cert, String keyPassword)
+    throws IOException, GeneralSecurityException {
+    KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+    return certToTrustStoreBytes(cert, keyPassword, trustStore);
+  }
+
+  /**
+   * Encodes the given X509Certificate as a PKCS12 TrustStore, optionally protecting the cert with a
+   * password (though it's unclear why one would do this since certificates only contain public
+   * information and do not need to be kept secret). Returns the byte array encoding of the trust
+   * store, which may be written to a file and loaded to instantiate the trust store at a later
+   * point or in another process.
+   * @param cert        the certificate to serialize.
+   * @param keyPassword an optional password to encrypt the trust store. If empty or null, the cert
+   *                    will not be encrypted.
+   * @return the serialized bytes of the PKCS12 trust store.
+   */
+  public static byte[] certToPKCS12TrustStoreBytes(X509Certificate cert, String keyPassword)
+    throws IOException, GeneralSecurityException {
+    KeyStore trustStore = KeyStore.getInstance("PKCS12");
+    return certToTrustStoreBytes(cert, keyPassword, trustStore);
+  }
+
+  private static byte[] certToTrustStoreBytes(X509Certificate cert, String keyPassword,
+    KeyStore trustStore) throws IOException, GeneralSecurityException {
+    char[] keyPasswordChars = keyPassword == null ? new char[0] : keyPassword.toCharArray();
+    trustStore.load(null, keyPasswordChars);
+    trustStore.setCertificateEntry(cert.getSubjectDN().toString(), cert);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    trustStore.store(outputStream, keyPasswordChars);
+    outputStream.flush();
+    byte[] result = outputStream.toByteArray();
+    outputStream.close();
+    return result;
+  }
+
+  /**
+   * Encodes the given X509Certificate and private key as a JKS KeyStore, optionally protecting the
+   * private key (and possibly the cert?) with a password. Returns the byte array encoding of the
+   * key store, which may be written to a file and loaded to instantiate the key store at a later
+   * point or in another process.
+   * @param cert        the X509 certificate to serialize.
+   * @param privateKey  the private key to serialize.
+   * @param keyPassword an optional key password. If empty or null, the private key will not be
+   *                    encrypted.
+   * @return the serialized bytes of the JKS key store.
+   */
+  public static byte[] certAndPrivateKeyToJavaKeyStoreBytes(X509Certificate cert,
+    PrivateKey privateKey, String keyPassword) throws IOException, GeneralSecurityException {
+    KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+    return certAndPrivateKeyToBytes(cert, privateKey, keyPassword, keyStore);
+  }
+
+  /**
+   * Encodes the given X509Certificate and private key as a PKCS12 KeyStore, optionally protecting
+   * the private key (and possibly the cert?) with a password. Returns the byte array encoding of
+   * the key store, which may be written to a file and loaded to instantiate the key store at a
+   * later point or in another process.
+   * @param cert        the X509 certificate to serialize.
+   * @param privateKey  the private key to serialize.
+   * @param keyPassword an optional key password. If empty or null, the private key will not be
+   *                    encrypted.
+   * @return the serialized bytes of the PKCS12 key store.
+   */
+  public static byte[] certAndPrivateKeyToPKCS12Bytes(X509Certificate cert, PrivateKey privateKey,
+    String keyPassword) throws IOException, GeneralSecurityException {
+    KeyStore keyStore = KeyStore.getInstance("PKCS12");
+    return certAndPrivateKeyToBytes(cert, privateKey, keyPassword, keyStore);
+  }
+
+  private static byte[] certAndPrivateKeyToBytes(X509Certificate cert, PrivateKey privateKey,
+    String keyPassword, KeyStore keyStore) throws IOException, GeneralSecurityException {
+    char[] keyPasswordChars = keyPassword == null ? new char[0] : keyPassword.toCharArray();
+    keyStore.load(null, keyPasswordChars);
+    keyStore.setKeyEntry("key", privateKey, keyPasswordChars, new Certificate[] { cert });
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    keyStore.store(outputStream, keyPasswordChars);
+    outputStream.flush();
+    byte[] result = outputStream.toByteArray();
+    outputStream.close();
+    return result;
+  }
+
+  /**
+   * Convenience method to convert a bouncycastle X509CertificateHolder to a java X509Certificate.
+   * @param certHolder a bouncycastle X509CertificateHolder.
+   * @return a java X509Certificate
+   * @throws CertificateException if the conversion fails.
+   */
+  public static X509Certificate toX509Cert(X509CertificateHolder certHolder)
+    throws CertificateException {
+    return new JcaX509CertificateConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME)
+      .getCertificate(certHolder);
+  }
+
+  private X509TestHelpers() {
+    // empty
+  }
+}
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index f9e52983af5..f00fba295e2 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -339,6 +339,16 @@
       <artifactId>log4j-1.2-api</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcprov-jdk15on</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcpkix-jdk15on</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <!-- Make sure resources get added before they are processed by placing this first
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 8f12b245030..88cd0776968 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -17,15 +17,21 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HBaseServerBase;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -47,6 +53,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
 import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
 
 /**
@@ -106,6 +114,9 @@ public class NettyRpcServer extends RpcServer {
           ChannelPipeline pipeline = ch.pipeline();
           FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
           preambleDecoder.setSingleDecode(true);
+          if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
+            initSSL(pipeline, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
+          }
           pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder);
           pipeline.addLast(createNettyRpcServerPreambleHandler(),
             new NettyRpcServerResponseEncoder(metrics));
@@ -214,4 +225,17 @@ public class NettyRpcServer extends RpcServer {
     // allChannels also contains the server channel, so exclude that from the count.
     return channelsCount > 0 ? channelsCount - 1 : channelsCount;
   }
+
+  private void initSSL(ChannelPipeline p, boolean supportPlaintext)
+    throws X509Exception, SSLException {
+    SslContext nettySslContext = X509Util.createSslContextForServer(conf);
+
+    if (supportPlaintext) {
+      p.addLast("ssl", new OptionalSslHandler(nettySslContext));
+      LOG.debug("Dual mode SSL handler added for channel: {}", p.channel());
+    } else {
+      p.addLast("ssl", nettySslContext.newHandler(p.channel().alloc()));
+      LOG.debug("SSL handler added for channel: {}", p.channel());
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPCSslFailure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPCSslFailure.java
new file mode 100644
index 00000000000..e16733b2a35
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPCSslFailure.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.exceptions.SSLContextException;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@Category({ RPCTests.class, MediumTests.class })
+public class TestNettyIPCSslFailure {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestNettyIPCSslFailure.class);
+
+  private static final Configuration CONF = HBaseConfiguration.create();
+
+  private NioEventLoopGroup group;
+
+  private NettyRpcServer server;
+
+  private NettyRpcClient client;
+
+  private TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
+
+  @Before
+  public void setUp() throws IOException, SSLContextException {
+    CONF.set(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, "true");
+    CONF.unset(X509Util.TLS_CONFIG_KEYSTORE_LOCATION);
+    group = new NioEventLoopGroup();
+    server = new NettyRpcServer(null, getClass().getSimpleName(),
+      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1), true);
+    NettyRpcClientConfigHelper.setEventLoopConfig(CONF, group, NioSocketChannel.class);
+    client = new NettyRpcClient(CONF);
+    server.start();
+    stub = TestProtobufRpcServiceImpl.newBlockingStub(client, server.getListenerAddress());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Closeables.close(client, true);
+    server.stop();
+    group.shutdownGracefully().sync();
+  }
+
+  @Test(expected = ServiceException.class)
+  public void testInitSslThrowsException() throws ServiceException {
+    stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage("test").build())
+      .getMessage();
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestTlsIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestTlsIPC.java
new file mode 100644
index 00000000000..abf3259a5d3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestTlsIPC.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED;
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+
+import java.net.InetSocketAddress;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.hbase.io.crypto.tls.BaseX509ParameterizedTestCase;
+import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@RunWith(Parameterized.class)
+@Category({ SecurityTests.class, MediumTests.class })
+public class TestTlsIPC extends BaseX509ParameterizedTestCase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTlsIPC.class);
+
+  @Parameterized.Parameter()
+  public X509KeyType caKeyType;
+
+  @Parameterized.Parameter(value = 1)
+  public X509KeyType certKeyType;
+
+  @Parameterized.Parameter(value = 2)
+  public String keyPassword;
+
+  @Parameterized.Parameter(value = 3)
+  public Integer paramIndex;
+
+  @Parameterized.Parameters(
+      name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, paramIndex={3}")
+  public static Collection<Object[]> data() {
+    List<Object[]> params = new ArrayList<>();
+    int paramIndex = 0;
+    for (X509KeyType caKeyType : X509KeyType.values()) {
+      for (X509KeyType certKeyType : X509KeyType.values()) {
+        for (String keyPassword : new String[] { KEY_EMPTY_PASSWORD, KEY_NON_EMPTY_PASSWORD }) {
+          params.add(new Object[] { caKeyType, certKeyType, keyPassword, paramIndex++ });
+        }
+      }
+    }
+    return params;
+  }
+
+  private static final String RPC_CLIENT_IMPL = NettyRpcClient.class.getName();
+  private static final String RPC_SERVER_IMPL = NettyRpcServer.class.getName();
+  private static final String HOST = "localhost";
+
+  private UserGroupInformation ugi;
+  private Configuration tlsConfiguration;
+  private Configuration clientConf;
+  private Configuration serverConf;
+
+  @Override
+  public void init(X509KeyType caKeyType, X509KeyType certKeyType, String keyPassword,
+    Integer paramIndex) throws Exception {
+    super.init(caKeyType, certKeyType, keyPassword, paramIndex);
+    x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+    tlsConfiguration = x509TestContext.getHbaseConf();
+  }
+
+  @Before
+  public void setUpTest() throws Exception {
+    init(caKeyType, certKeyType, keyPassword, paramIndex);
+    String clientusername = "testuser";
+    ugi =
+      UserGroupInformation.createUserForTesting(clientusername, new String[] { clientusername });
+    clientConf = HBaseConfiguration.create(tlsConfiguration);
+    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
+    serverConf = HBaseConfiguration.create(tlsConfiguration);
+    serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
+  }
+
+  @After
+  public void cleanUp() {
+    x509TestContext.clearSystemProperties();
+    x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_OCSP);
+    x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_CLR);
+    x509TestContext.getHbaseConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
+    System.clearProperty("com.sun.net.ssl.checkRevocation");
+    System.clearProperty("com.sun.security.enableCRLDP");
+    Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
+    Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
+  }
+
+  @Test
+  public void testNoPlaintext() throws Exception {
+    setTLSEncryption(true, false, true);
+    callRpcService(User.create(ugi));
+  }
+
+  @Test
+  public void testRejectPlaintext() {
+    setTLSEncryption(true, false, false);
+    Assert.assertThrows(ConnectionClosedException.class, () -> callRpcService(User.create(ugi)));
+  }
+
+  @Test
+  public void testAcceptPlaintext() throws Exception {
+    setTLSEncryption(true, true, false);
+    callRpcService(User.create(ugi));
+  }
+
+  void setTLSEncryption(Boolean server, Boolean acceptPlaintext, Boolean client) {
+    serverConf.set(HBASE_SERVER_NETTY_TLS_ENABLED, server.toString());
+    serverConf.set(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlaintext.toString());
+    clientConf.set(HBASE_CLIENT_NETTY_TLS_ENABLED, client.toString());
+  }
+
+  /**
+   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
+   * the stub, this function will throw root cause of that exception.
+   */
+  private void callRpcService(User clientUser) throws Exception {
+    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
+    SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
+
+    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
+
+    RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
+      Lists
+        .newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
+      isa, serverConf, new FifoRpcScheduler(serverConf, 1));
+    rpcServer.start();
+    try (RpcClient rpcClient =
+      RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+        newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
+      TestSecureIPC.TestThread th = new TestSecureIPC.TestThread(stub);
+      AtomicReference<Throwable> exception = new AtomicReference<>();
+      Collections.synchronizedList(new ArrayList<Throwable>());
+      Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread th, Throwable ex) {
+          exception.set(ex);
+        }
+      };
+      th.setUncaughtExceptionHandler(exceptionHandler);
+      th.start();
+      th.join();
+      if (exception.get() != null) {
+        // throw root cause.
+        while (exception.get().getCause() != null) {
+          exception.set(exception.get().getCause());
+        }
+        throw (Exception) exception.get();
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestTlsWithKerberos.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestTlsWithKerberos.java
new file mode 100644
index 00000000000..f0b8bb12e11
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestTlsWithKerberos.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED;
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
+import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@Category({ SecurityTests.class, LargeTests.class })
+public class TestTlsWithKerberos {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestTlsWithKerberos.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  private static final File KEYTAB_FILE =
+    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+
+  private static MiniKdc KDC;
+  private static final String HOST = "localhost";
+  private static String PRINCIPAL;
+  private static final String RPC_CLIENT_IMPL = NettyRpcClient.class.getName();
+  private static final String RPC_SERVER_IMPL = NettyRpcServer.class.getName();
+
+  private String krbKeytab;
+  private String krbPrincipal;
+  private UserGroupInformation ugi;
+  private Configuration clientConf;
+  private Configuration serverConf;
+  private static X509TestContext x509TestContext;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Security.addProvider(new BouncyCastleProvider());
+    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
+    PRINCIPAL = "hbase/" + HOST;
+    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
+    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
+    x509TestContext = X509TestContext.newBuilder()
+      .setTempDir(new File(TEST_UTIL.getDataTestDir().toUri().getPath()))
+      .setKeyStorePassword("Pa$$word").setKeyStoreKeyType(X509KeyType.RSA)
+      .setTrustStoreKeyType(X509KeyType.RSA).setTrustStorePassword("Pa$$word").build();
+    x509TestContext.setSystemProperties(KeyStoreFileType.JKS, KeyStoreFileType.JKS);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+    if (KDC != null) {
+      KDC.stop();
+    }
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUpTest() throws Exception {
+    krbKeytab = getKeytabFileForTesting();
+    krbPrincipal = getPrincipalForTesting();
+    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
+    clientConf = HBaseConfiguration.create(x509TestContext.getHbaseConf());
+    setSecuredConfiguration(clientConf);
+    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RPC_CLIENT_IMPL);
+    serverConf = HBaseConfiguration.create(x509TestContext.getHbaseConf());
+    setSecuredConfiguration(serverConf);
+    serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, RPC_SERVER_IMPL);
+  }
+
+  @Test
+  public void testNoPlaintext() throws Exception {
+    setRpcProtection("authentication", "authentication");
+    setTLSEncryption(true, false, true);
+    callRpcService(User.create(ugi));
+  }
+
+  @Test
+  public void testRejectPlaintext() {
+    setRpcProtection("authentication", "authentication");
+    setTLSEncryption(true, false, false);
+    Assert.assertThrows(ConnectionClosedException.class, () -> callRpcService(User.create(ugi)));
+  }
+
+  @Test
+  public void testAcceptPlaintext() throws Exception {
+    setRpcProtection("authentication", "authentication");
+    setTLSEncryption(true, true, false);
+    callRpcService(User.create(ugi));
+  }
+
+  void setTLSEncryption(Boolean server, Boolean acceptPlaintext, Boolean client) {
+    serverConf.set(HBASE_SERVER_NETTY_TLS_ENABLED, server.toString());
+    serverConf.set(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, acceptPlaintext.toString());
+    clientConf.set(HBASE_CLIENT_NETTY_TLS_ENABLED, client.toString());
+  }
+
+  void setRpcProtection(String clientProtection, String serverProtection) {
+    clientConf.set("hbase.rpc.protection", clientProtection);
+    serverConf.set("hbase.rpc.protection", serverProtection);
+  }
+
+  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
+    throws Exception {
+    Configuration cnf = new Configuration();
+    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(cnf);
+    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
+    return UserGroupInformation.getLoginUser();
+  }
+
+  /**
+   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
+   * the stub, this function will throw root cause of that exception.
+   */
+  private void callRpcService(User clientUser) throws Exception {
+    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
+    Mockito.when(securityInfoMock.getServerPrincipal())
+      .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
+
+    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
+
+    RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
+      Lists
+        .newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
+      isa, serverConf, new FifoRpcScheduler(serverConf, 1));
+    rpcServer.start();
+    try (RpcClient rpcClient =
+      RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+        newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
+      TestSecureIPC.TestThread th = new TestSecureIPC.TestThread(stub);
+      AtomicReference<Throwable> exception = new AtomicReference<>();
+      Collections.synchronizedList(new ArrayList<Throwable>());
+      Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread th, Throwable ex) {
+          exception.set(ex);
+        }
+      };
+      th.setUncaughtExceptionHandler(exceptionHandler);
+      th.start();
+      th.join();
+      if (exception.get() != null) {
+        // throw root cause.
+        while (exception.get().getCause() != null) {
+          exception.set(exception.get().getCause());
+        }
+        throw (Exception) exception.get();
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 4ce87cf12ec..b397bb57230 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1564,6 +1564,12 @@
         <version>${bouncycastle.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.bouncycastle</groupId>
+        <artifactId>bcpkix-jdk15on</artifactId>
+        <version>${bouncycastle.version}</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.apache.kerby</groupId>
         <artifactId>kerb-core</artifactId>