You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/11/28 13:06:40 UTC

[hbase] branch branch-2 updated: HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (branch-2) (#4897)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new e90a351d0db HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (branch-2) (#4897)
e90a351d0db is described below

commit e90a351d0dbc3634a3cd6013e993dae1b85b107c
Author: Andor Molnár <an...@cloudera.com>
AuthorDate: Mon Nov 28 14:06:29 2022 +0100

    HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (branch-2) (#4897)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
 .../apache/hadoop/hbase/ipc/NettyRpcClient.java    |  17 ++
 .../apache/hadoop/hbase/io/FileChangeWatcher.java  | 251 ++++++++++++++++++
 .../hadoop/hbase/io/crypto/tls/X509Util.java       |  78 +++++-
 .../hadoop/hbase/io/TestFileChangeWatcher.java     | 279 +++++++++++++++++++++
 .../hbase/io/crypto/tls/X509TestContext.java       | 242 +++++++++++-------
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    |  33 ++-
 .../hbase/security/TestNettyTLSIPCFileWatcher.java | 234 +++++++++++++++++
 7 files changed, 1045 insertions(+), 89 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index ba387d2bf32..606749315f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.FileChangeWatcher;
 import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -50,6 +51,8 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
 
   private final boolean shutdownGroupWhenClose;
   private final AtomicReference<SslContext> sslContextForClient = new AtomicReference<>();
+  private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
+  private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
 
   public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
     MetricsConnection metrics) {
@@ -86,6 +89,14 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
     if (shutdownGroupWhenClose) {
       NettyFutureUtils.consume(group.shutdownGracefully());
     }
+    FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
+    if (ks != null) {
+      ks.stop();
+    }
+    FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
+    if (ts != null) {
+      ts.stop();
+    }
   }
 
   SslContext getSslContext() throws X509Exception, IOException {
@@ -95,6 +106,12 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
       if (!sslContextForClient.compareAndSet(null, result)) {
         // lost the race, another thread already set the value
         result = sslContextForClient.get();
+      } else if (
+        keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
+          && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
+      ) {
+        X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
+          () -> sslContextForClient.set(null));
       }
     }
     return result;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
new file mode 100644
index 00000000000..77e0e4e750c
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.function.Consumer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.server.ZooKeeperThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Instances of this class can be used to watch a directory for file changes. When a file is added
+ * to, deleted from, or is modified in the given directory, the callback provided by the user will
+ * be called from a background thread. Some things to keep in mind:
+ * <ul>
+ * <li>The callback should be thread-safe.</li>
+ * <li>Changes that happen around the time the thread is started may be missed.</li>
+ * <li>There is a delay between a file changing and the callback firing.</li>
+ * <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
+ * </ul>
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base
+ *      revision</a>
+ */
+@InterfaceAudience.Private
+public final class FileChangeWatcher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);
+
+  public enum State {
+    NEW, // object created but start() not called yet
+    STARTING, // start() called but background thread has not entered main loop
+    RUNNING, // background thread is running
+    STOPPING, // stop() called but background thread has not exited main loop
+    STOPPED // stop() called and background thread has exited, or background thread crashed
+  }
+
+  private final WatcherThread watcherThread;
+  private State state; // protected by synchronized(this)
+
+  /**
+   * Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on
+   * changes.
+   * @param dirPath  the directory to watch.
+   * @param callback the callback to invoke with events. <code>event.kind()</code> will return the
+   *                 type of event, and <code>event.context()</code> will return the filename
+   *                 relative to <code>dirPath</code>.
+   * @throws IOException if there is an error creating the WatchService.
+   */
+  public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
+    FileSystem fs = dirPath.getFileSystem();
+    WatchService watchService = fs.newWatchService();
+
+    LOG.debug("Registering with watch service: {}", dirPath);
+
+    dirPath.register(watchService,
+      new WatchEvent.Kind<?>[] { StandardWatchEventKinds.ENTRY_CREATE,
+        StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
+        StandardWatchEventKinds.OVERFLOW });
+    state = State.NEW;
+    this.watcherThread = new WatcherThread(watchService, callback);
+    this.watcherThread.setDaemon(true);
+  }
+
+  /**
+   * Returns the current {@link FileChangeWatcher.State}.
+   * @return the current state.
+   */
+  public synchronized State getState() {
+    return state;
+  }
+
+  /**
+   * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests,
+   * thus package-private.
+   * @param desiredState the desired state.
+   * @throws InterruptedException if the current thread gets interrupted.
+   */
+  synchronized void waitForState(State desiredState) throws InterruptedException {
+    while (this.state != desiredState) {
+      this.wait();
+    }
+  }
+
+  /**
+   * Sets the state to <code>newState</code>.
+   * @param newState the new state.
+   */
+  private synchronized void setState(State newState) {
+    state = newState;
+    this.notifyAll();
+  }
+
+  /**
+   * Atomically sets the state to <code>update</code> if and only if the state is currently
+   * <code>expected</code>.
+   * @param expected the expected state.
+   * @param update   the new state.
+   * @return true if the update succeeds, or false if the current state does not equal
+   *         <code>expected</code>.
+   */
+  private synchronized boolean compareAndSetState(State expected, State update) {
+    if (state == expected) {
+      setState(update);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Atomically sets the state to <code>update</code> if and only if the state is currently one of
+   * <code>expectedStates</code>.
+   * @param expectedStates the expected states.
+   * @param update         the new state.
+   * @return true if the update succeeds, or false if the current state does not equal any of the
+   *         <code>expectedStates</code>.
+   */
+  private synchronized boolean compareAndSetState(State[] expectedStates, State update) {
+    for (State expected : expectedStates) {
+      if (state == expected) {
+        setState(update);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Tells the background thread to start. Does not wait for it to be running. Calling this method
+   * more than once has no effect.
+   */
+  public void start() {
+    if (!compareAndSetState(State.NEW, State.STARTING)) {
+      // If previous state was not NEW, start() has already been called.
+      return;
+    }
+    this.watcherThread.start();
+  }
+
+  /**
+   * Tells the background thread to stop. Does not wait for it to exit.
+   */
+  public void stop() {
+    if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) {
+      watcherThread.interrupt();
+    }
+  }
+
+  /**
+   * Inner class that implements the watcher thread logic.
+   */
+  private class WatcherThread extends ZooKeeperThread {
+
+    private static final String THREAD_NAME = "FileChangeWatcher";
+
+    final WatchService watchService;
+    final Consumer<WatchEvent<?>> callback;
+
+    WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
+      super(THREAD_NAME);
+      this.watchService = watchService;
+      this.callback = callback;
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.info("{} thread started", getName());
+        if (
+          !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING)
+        ) {
+          // stop() called shortly after start(), before
+          // this thread started running.
+          FileChangeWatcher.State state = FileChangeWatcher.this.getState();
+          if (state != FileChangeWatcher.State.STOPPING) {
+            throw new IllegalStateException("Unexpected state: " + state);
+          }
+          return;
+        }
+        runLoop();
+      } catch (Exception e) {
+        LOG.warn("Error in runLoop()", e);
+        throw e;
+      } finally {
+        try {
+          watchService.close();
+        } catch (IOException e) {
+          LOG.warn("Error closing watch service", e);
+        }
+        LOG.info("{} thread finished", getName());
+        FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED);
+      }
+    }
+
+    private void runLoop() {
+      while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) {
+        WatchKey key;
+        try {
+          key = watchService.take();
+        } catch (InterruptedException | ClosedWatchServiceException e) {
+          LOG.debug("{} was interrupted and is shutting down...", getName());
+          break;
+        }
+        for (WatchEvent<?> event : key.pollEvents()) {
+          LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context());
+          try {
+            callback.accept(event);
+          } catch (Throwable e) {
+            LOG.error("Error from callback", e);
+          }
+        }
+        boolean isKeyValid = key.reset();
+        if (!isKeyValid) {
+          // This is likely a problem, it means that file reloading is broken, probably because the
+          // directory we are watching was deleted or otherwise became inaccessible (unmounted,
+          // permissions
+          // changed, ???).
+          // For now, we log an error and exit the watcher thread.
+          LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
+          break;
+        }
+      }
+    }
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
index 96aa66364be..00a59acf41a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hbase.io.crypto.tls;
 
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.Security;
@@ -25,6 +29,7 @@ import java.security.cert.PKIXBuilderParameters;
 import java.security.cert.X509CertSelector;
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.CertPathTrustManagerParameters;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
@@ -38,6 +43,7 @@ import org.apache.hadoop.hbase.exceptions.KeyManagerException;
 import org.apache.hadoop.hbase.exceptions.SSLContextException;
 import org.apache.hadoop.hbase.exceptions.TrustManagerException;
 import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.FileChangeWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +85,7 @@ public final class X509Util {
     CONFIG_PREFIX + "host-verification.reverse-dns.enabled";
   private static final String TLS_ENABLED_PROTOCOLS = CONFIG_PREFIX + "enabledProtocols";
   private static final String TLS_CIPHER_SUITES = CONFIG_PREFIX + "ciphersuites";
+  public static final String TLS_CERT_RELOAD = CONFIG_PREFIX + "certReload";
   public static final String DEFAULT_PROTOCOL = "TLSv1.2";
 
   //
@@ -244,7 +251,6 @@ public final class X509Util {
     }
 
     SslContextBuilder sslContextBuilder;
-
     sslContextBuilder = SslContextBuilder
       .forServer(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType));
 
@@ -395,4 +401,74 @@ public final class X509Util {
       return cipherSuitesInput.split(",");
     }
   }
+
+  /**
+   * Enable certificate file reloading by creating FileWatchers for keystore and truststore.
+   * AtomicReferences will be set with the new instances. resetContext - if not null - will be
+   * called when the file has been modified.
+   * @param keystoreWatcher   Reference to keystoreFileWatcher.
+   * @param trustStoreWatcher Reference to truststoreFileWatcher.
+   * @param resetContext      Callback for file changes.
+   */
+  public static void enableCertFileReloading(Configuration config,
+    AtomicReference<FileChangeWatcher> keystoreWatcher,
+    AtomicReference<FileChangeWatcher> trustStoreWatcher, Runnable resetContext)
+    throws IOException {
+    String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
+    keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext));
+    String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
+    trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext));
+  }
+
+  private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext)
+    throws IOException {
+    if (fileLocation == null || fileLocation.isEmpty() || resetContext == null) {
+      return null;
+    }
+    final Path filePath = Paths.get(fileLocation).toAbsolutePath();
+    Path parentPath = filePath.getParent();
+    if (parentPath == null) {
+      throw new IOException("Key/trust store path does not have a parent: " + filePath);
+    }
+    FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, watchEvent -> {
+      handleWatchEvent(filePath, watchEvent, resetContext);
+    });
+    fileChangeWatcher.start();
+    return fileChangeWatcher;
+  }
+
+  /**
+   * Handler for watch events that let us know a file we may care about has changed on disk.
+   * @param filePath the path to the file we are watching for changes.
+   * @param event    the WatchEvent.
+   */
+  private static void handleWatchEvent(Path filePath, WatchEvent<?> event, Runnable resetContext) {
+    boolean shouldResetContext = false;
+    Path dirPath = filePath.getParent();
+    if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) {
+      // If we get notified about possibly missed events, reload the key store / trust store just to
+      // be sure.
+      shouldResetContext = true;
+    } else if (
+      event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY)
+        || event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)
+    ) {
+      Path eventFilePath = dirPath.resolve((Path) event.context());
+      if (filePath.equals(eventFilePath)) {
+        shouldResetContext = true;
+      }
+    }
+    // Note: we don't care about delete events
+    if (shouldResetContext) {
+      LOG.info(
+        "Attempting to reset default SSL context after receiving watch event: {} with context: {}",
+        event.kind(), event.context());
+      resetContext.run();
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ignoring watch event and keeping previous default SSL context. "
+          + "Event kind: {} with context: {}", event.kind(), event.context());
+      }
+    }
+  }
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
new file mode 100644
index 00000000000..0d94a550d79
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ *      "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
+ *      revision</a>
+ */
+@Category({ IOTests.class, SmallTests.class })
+public class TestFileChangeWatcher {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFileChangeWatcher.class);
+
+  private static File tempDir;
+  private static File tempFile;
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class);
+  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  private static final long FS_TIMEOUT = 30000L;
+
+  @BeforeClass
+  public static void createTempFile() throws IOException {
+    tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString())
+      .getCanonicalFile();
+    FileUtils.forceMkdir(tempDir);
+    tempFile = File.createTempFile("zk_test_", "", tempDir);
+  }
+
+  @AfterClass
+  public static void cleanupTempDir() {
+    UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
+    FileChangeWatcher watcher = null;
+    try {
+      final List<WatchEvent<?>> events = new ArrayList<>();
+      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+        LOG.info("Got an update: {} {}", event.kind(), event.context());
+        // Filter out the extra ENTRY_CREATE events that are
+        // sometimes seen at the start. Even though we create the watcher
+        // after the file exists, sometimes we still get a create event.
+        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+          return;
+        }
+        synchronized (events) {
+          events.add(event);
+          events.notifyAll();
+        }
+      });
+      watcher.start();
+      watcher.waitForState(FileChangeWatcher.State.RUNNING);
+      Thread.sleep(1000L); // TODO hack
+      for (int i = 0; i < 3; i++) {
+        LOG.info("Modifying file, attempt {}", (i + 1));
+        FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8,
+          true);
+        synchronized (events) {
+          if (events.size() < i + 1) {
+            events.wait(FS_TIMEOUT);
+          }
+          assertEquals("Wrong number of events", i + 1, events.size());
+          WatchEvent<?> event = events.get(i);
+          assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
+          assertEquals(tempFile.getName(), event.context().toString());
+        }
+      }
+    } finally {
+      if (watcher != null) {
+        watcher.stop();
+        watcher.waitForState(FileChangeWatcher.State.STOPPED);
+      }
+    }
+  }
+
+  @Test
+  public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException {
+    FileChangeWatcher watcher = null;
+    try {
+      final List<WatchEvent<?>> events = new ArrayList<>();
+      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+        LOG.info("Got an update: {} {}", event.kind(), event.context());
+        // Filter out the extra ENTRY_CREATE events that are
+        // sometimes seen at the start. Even though we create the watcher
+        // after the file exists, sometimes we still get a create event.
+        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+          return;
+        }
+        synchronized (events) {
+          events.add(event);
+          events.notifyAll();
+        }
+      });
+      watcher.start();
+      watcher.waitForState(FileChangeWatcher.State.RUNNING);
+      Thread.sleep(1000L); // TODO hack
+      LOG.info("Touching file");
+      FileUtils.touch(tempFile);
+      synchronized (events) {
+        if (events.isEmpty()) {
+          events.wait(FS_TIMEOUT);
+        }
+        assertFalse(events.isEmpty());
+        WatchEvent<?> event = events.get(0);
+        assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
+        assertEquals(tempFile.getName(), event.context().toString());
+      }
+    } finally {
+      if (watcher != null) {
+        watcher.stop();
+        watcher.waitForState(FileChangeWatcher.State.STOPPED);
+      }
+    }
+  }
+
+  @Test
+  public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException {
+    FileChangeWatcher watcher = null;
+    try {
+      final List<WatchEvent<?>> events = new ArrayList<>();
+      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+        LOG.info("Got an update: {} {}", event.kind(), event.context());
+        synchronized (events) {
+          events.add(event);
+          events.notifyAll();
+        }
+      });
+      watcher.start();
+      watcher.waitForState(FileChangeWatcher.State.RUNNING);
+      Thread.sleep(1000L); // TODO hack
+      File tempFile2 = File.createTempFile("zk_test_", "", tempDir);
+      tempFile2.deleteOnExit();
+      synchronized (events) {
+        if (events.isEmpty()) {
+          events.wait(FS_TIMEOUT);
+        }
+        assertFalse(events.isEmpty());
+        WatchEvent<?> event = events.get(0);
+        assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind());
+        assertEquals(tempFile2.getName(), event.context().toString());
+      }
+    } finally {
+      if (watcher != null) {
+        watcher.stop();
+        watcher.waitForState(FileChangeWatcher.State.STOPPED);
+      }
+    }
+  }
+
+  @Test
+  public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException {
+    FileChangeWatcher watcher = null;
+    try {
+      final List<WatchEvent<?>> events = new ArrayList<>();
+      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+        LOG.info("Got an update: {} {}", event.kind(), event.context());
+        // Filter out the extra ENTRY_CREATE events that are
+        // sometimes seen at the start. Even though we create the watcher
+        // after the file exists, sometimes we still get a create event.
+        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+          return;
+        }
+        synchronized (events) {
+          events.add(event);
+          events.notifyAll();
+        }
+      });
+      watcher.start();
+      watcher.waitForState(FileChangeWatcher.State.RUNNING);
+      Thread.sleep(1000L); // TODO hack
+      tempFile.delete();
+      synchronized (events) {
+        if (events.isEmpty()) {
+          events.wait(FS_TIMEOUT);
+        }
+        assertFalse(events.isEmpty());
+        WatchEvent<?> event = events.get(0);
+        assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind());
+        assertEquals(tempFile.getName(), event.context().toString());
+      }
+    } finally {
+      if (watcher != null) {
+        watcher.stop();
+        watcher.waitForState(FileChangeWatcher.State.STOPPED);
+      }
+    }
+  }
+
+  @Test
+  public void testCallbackErrorDoesNotCrashWatcherThread()
+    throws IOException, InterruptedException {
+    FileChangeWatcher watcher = null;
+    try {
+      final AtomicInteger callCount = new AtomicInteger(0);
+      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+        LOG.info("Got an update: {} {}", event.kind(), event.context());
+        int oldValue;
+        synchronized (callCount) {
+          oldValue = callCount.getAndIncrement();
+          callCount.notifyAll();
+        }
+        if (oldValue == 0) {
+          throw new RuntimeException("This error should not crash the watcher thread");
+        }
+      });
+      watcher.start();
+      watcher.waitForState(FileChangeWatcher.State.RUNNING);
+      Thread.sleep(1000L); // TODO hack
+      LOG.info("Modifying file");
+      FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
+      synchronized (callCount) {
+        while (callCount.get() == 0) {
+          callCount.wait(FS_TIMEOUT);
+        }
+      }
+      LOG.info("Modifying file again");
+      FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
+      synchronized (callCount) {
+        if (callCount.get() == 1) {
+          callCount.wait(FS_TIMEOUT);
+        }
+      }
+      // The value of callCount can exceed 1 only if the callback thread
+      // survives the exception thrown by the first callback.
+      assertTrue(callCount.get() > 1);
+    } finally {
+      if (watcher != null) {
+        watcher.stop();
+        watcher.waitForState(FileChangeWatcher.State.STOPPED);
+      }
+    }
+  }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
index 0185ebff0ec..ad4ffe0ab5a 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
@@ -57,16 +57,16 @@ public final class X509TestContext {
   private final File tempDir;
   private final Configuration conf;
 
-  private final X509Certificate trustStoreCertificate;
+  private X509Certificate trustStoreCertificate;
   private final char[] trustStorePassword;
-  private final KeyPair trustStoreKeyPair;
+  private KeyPair trustStoreKeyPair;
   private File trustStoreJksFile;
   private File trustStorePemFile;
   private File trustStorePkcs12File;
   private File trustStoreBcfksFile;
 
-  private final KeyPair keyStoreKeyPair;
-  private final X509Certificate keyStoreCertificate;
+  private KeyPair keyStoreKeyPair;
+  private X509Certificate keyStoreCertificate;
   private final char[] keyStorePassword;
   private File keyStoreJksFile;
   private File keyStorePemFile;
@@ -101,16 +101,8 @@ public final class X509TestContext {
     this.keyStoreKeyPair = requireNonNull(keyStoreKeyPair);
     this.keyStorePassword = requireNonNull(keyStorePassword);
 
-    X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
-    caNameBuilder.addRDN(BCStyle.CN,
-      MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA");
-    trustStoreCertificate =
-      X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair);
+    createCertificates();
 
-    X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
-    nameBuilder.addRDN(BCStyle.CN,
-      MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test");
-    keyStoreCertificate = newCert(nameBuilder.build());
     trustStorePkcs12File = null;
     trustStorePemFile = null;
     trustStoreJksFile = null;
@@ -197,74 +189,85 @@ public final class X509TestContext {
 
   private File getTrustStoreJksFile() throws IOException {
     if (trustStoreJksFile == null) {
-      File trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX,
+      trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX,
         KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
       trustStoreJksFile.deleteOnExit();
-      try (
-        final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) {
-        byte[] bytes =
-          X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword);
-        trustStoreOutputStream.write(bytes);
-        trustStoreOutputStream.flush();
-      } catch (GeneralSecurityException e) {
-        throw new IOException(e);
-      }
-      this.trustStoreJksFile = trustStoreJksFile;
+      generateTrustStoreJksFile();
     }
     return trustStoreJksFile;
   }
 
+  private void generateTrustStoreJksFile() throws IOException {
+    try (final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) {
+      byte[] bytes =
+        X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword);
+      trustStoreOutputStream.write(bytes);
+      trustStoreOutputStream.flush();
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+
   private File getTrustStorePemFile() throws IOException {
     if (trustStorePemFile == null) {
-      File trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX,
+      trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX,
         KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
       trustStorePemFile.deleteOnExit();
-      FileUtils.writeStringToFile(trustStorePemFile,
-        X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII,
-        false);
-      this.trustStorePemFile = trustStorePemFile;
+      generateTrustStorePemFile();
     }
     return trustStorePemFile;
   }
 
+  private void generateTrustStorePemFile() throws IOException {
+    FileUtils.writeStringToFile(trustStorePemFile,
+      X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII,
+      false);
+  }
+
   private File getTrustStorePkcs12File() throws IOException {
     if (trustStorePkcs12File == null) {
-      File trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX,
+      trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX,
         KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
       trustStorePkcs12File.deleteOnExit();
-      try (final FileOutputStream trustStoreOutputStream =
-        new FileOutputStream(trustStorePkcs12File)) {
-        byte[] bytes =
-          X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword);
-        trustStoreOutputStream.write(bytes);
-        trustStoreOutputStream.flush();
-      } catch (GeneralSecurityException e) {
-        throw new IOException(e);
-      }
-      this.trustStorePkcs12File = trustStorePkcs12File;
+      generateTrustStorePkcs12File();
     }
     return trustStorePkcs12File;
   }
 
+  private void generateTrustStorePkcs12File() throws IOException {
+    try (
+      final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStorePkcs12File)) {
+      byte[] bytes =
+        X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword);
+      trustStoreOutputStream.write(bytes);
+      trustStoreOutputStream.flush();
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+
   private File getTrustStoreBcfksFile() throws IOException {
     if (trustStoreBcfksFile == null) {
-      File trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX,
+      trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX,
         KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir);
       trustStoreBcfksFile.deleteOnExit();
-      try (
-        final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) {
-        byte[] bytes =
-          X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword);
-        trustStoreOutputStream.write(bytes);
-        trustStoreOutputStream.flush();
-      } catch (GeneralSecurityException e) {
-        throw new IOException(e);
-      }
-      this.trustStoreBcfksFile = trustStoreBcfksFile;
+      generateTrustStoreBcfksFile();
     }
     return trustStoreBcfksFile;
   }
 
+  private void generateTrustStoreBcfksFile() throws IOException {
+    try (
+      final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) {
+      byte[] bytes =
+        X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword);
+      trustStoreOutputStream.write(bytes);
+      trustStoreOutputStream.flush();
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+
   public X509Certificate getKeyStoreCertificate() {
     return keyStoreCertificate;
   }
@@ -307,33 +310,32 @@ public final class X509TestContext {
 
   private File getKeyStoreJksFile() throws IOException {
     if (keyStoreJksFile == null) {
-      File keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX,
+      keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX,
         KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
       keyStoreJksFile.deleteOnExit();
-      try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) {
-        byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate,
-          keyStoreKeyPair.getPrivate(), keyStorePassword);
-        keyStoreOutputStream.write(bytes);
-        keyStoreOutputStream.flush();
-      } catch (GeneralSecurityException e) {
-        throw new IOException(e);
-      }
-      this.keyStoreJksFile = keyStoreJksFile;
+      generateKeyStoreJksFile();
     }
     return keyStoreJksFile;
   }
 
+  private void generateKeyStoreJksFile() throws IOException {
+    try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) {
+      byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate,
+        keyStoreKeyPair.getPrivate(), keyStorePassword);
+      keyStoreOutputStream.write(bytes);
+      keyStoreOutputStream.flush();
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+
   private File getKeyStorePemFile() throws IOException {
     if (keyStorePemFile == null) {
       try {
-        File keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX,
+        keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX,
           KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
         keyStorePemFile.deleteOnExit();
-        FileUtils.writeStringToFile(keyStorePemFile,
-          X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate,
-            keyStoreKeyPair.getPrivate(), keyStorePassword),
-          StandardCharsets.US_ASCII, false);
-        this.keyStorePemFile = keyStorePemFile;
+        generateKeyStorePemFile();
       } catch (OperatorCreationException e) {
         throw new IOException(e);
       }
@@ -341,42 +343,55 @@ public final class X509TestContext {
     return keyStorePemFile;
   }
 
+  private void generateKeyStorePemFile() throws IOException, OperatorCreationException {
+    FileUtils.writeStringToFile(keyStorePemFile,
+      X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, keyStoreKeyPair.getPrivate(),
+        keyStorePassword),
+      StandardCharsets.US_ASCII, false);
+  }
+
   private File getKeyStorePkcs12File() throws IOException {
     if (keyStorePkcs12File == null) {
-      File keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX,
+      keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX,
         KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
       keyStorePkcs12File.deleteOnExit();
-      try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) {
-        byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate,
-          keyStoreKeyPair.getPrivate(), keyStorePassword);
-        keyStoreOutputStream.write(bytes);
-        keyStoreOutputStream.flush();
-      } catch (GeneralSecurityException e) {
-        throw new IOException(e);
-      }
-      this.keyStorePkcs12File = keyStorePkcs12File;
+      generateKeyStorePkcs12File();
     }
     return keyStorePkcs12File;
   }
 
+  private void generateKeyStorePkcs12File() throws IOException {
+    try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) {
+      byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate,
+        keyStoreKeyPair.getPrivate(), keyStorePassword);
+      keyStoreOutputStream.write(bytes);
+      keyStoreOutputStream.flush();
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+
   private File getKeyStoreBcfksFile() throws IOException {
     if (keyStoreBcfksFile == null) {
-      File keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX,
+      keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX,
         KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir);
       keyStoreBcfksFile.deleteOnExit();
-      try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) {
-        byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate,
-          keyStoreKeyPair.getPrivate(), keyStorePassword);
-        keyStoreOutputStream.write(bytes);
-        keyStoreOutputStream.flush();
-      } catch (GeneralSecurityException e) {
-        throw new IOException(e);
-      }
-      this.keyStoreBcfksFile = keyStoreBcfksFile;
+      generateKeyStoreBcfksFile();
     }
     return keyStoreBcfksFile;
   }
 
+  private void generateKeyStoreBcfksFile() throws IOException {
+    try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) {
+      byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate,
+        keyStoreKeyPair.getPrivate(), keyStorePassword);
+      keyStoreOutputStream.write(bytes);
+      keyStoreOutputStream.flush();
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+
   /**
    * Sets the SSL system properties such that the given X509Util object can be used to create SSL
    * Contexts that will use the trust store and key store files created by this test context.
@@ -445,6 +460,59 @@ public final class X509TestContext {
       keyStoreKeyPair, keyStorePassword, cert);
   }
 
+  public void regenerateStores(X509KeyType keyStoreKeyType, X509KeyType trustStoreKeyType,
+    KeyStoreFileType keyStoreFileType, KeyStoreFileType trustStoreFileType)
+    throws GeneralSecurityException, IOException, OperatorCreationException {
+
+    trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType);
+    keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType);
+    createCertificates();
+
+    switch (keyStoreFileType) {
+      case JKS:
+        generateKeyStoreJksFile();
+        break;
+      case PEM:
+        generateKeyStorePemFile();
+        break;
+      case BCFKS:
+        generateKeyStoreBcfksFile();
+        break;
+      case PKCS12:
+        generateKeyStorePkcs12File();
+        break;
+    }
+
+    switch (trustStoreFileType) {
+      case JKS:
+        generateTrustStoreJksFile();
+        break;
+      case PEM:
+        generateTrustStorePemFile();
+        break;
+      case PKCS12:
+        generateTrustStorePkcs12File();
+        break;
+      case BCFKS:
+        generateTrustStoreBcfksFile();
+        break;
+    }
+  }
+
+  private void createCertificates()
+    throws GeneralSecurityException, IOException, OperatorCreationException {
+    X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+    caNameBuilder.addRDN(BCStyle.CN,
+      MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA");
+    trustStoreCertificate =
+      X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair);
+
+    X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+    nameBuilder.addRDN(BCStyle.CN,
+      MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test");
+    keyStoreCertificate = newCert(nameBuilder.build());
+  }
+
   /**
    * Builder class, used for creating new instances of X509TestContext.
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index e2f76ca0f10..d42b03d4d5d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -25,11 +25,13 @@ import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.FileChangeWatcher;
 import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -92,6 +94,9 @@ public class NettyRpcServer extends RpcServer {
   private final Channel serverChannel;
   final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
   private final ByteBufAllocator channelAllocator;
+  private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>();
+  private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
+  private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
 
   public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
     InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
@@ -199,6 +204,14 @@ public class NettyRpcServer extends RpcServer {
       return;
     }
     LOG.info("Stopping server on " + this.serverChannel.localAddress());
+    FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
+    if (ks != null) {
+      ks.stop();
+    }
+    FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
+    if (ts != null) {
+      ts.stop();
+    }
     if (authTokenSecretMgr != null) {
       authTokenSecretMgr.stop();
       authTokenSecretMgr = null;
@@ -248,7 +261,7 @@ public class NettyRpcServer extends RpcServer {
 
   private void initSSL(ChannelPipeline p, boolean supportPlaintext)
     throws X509Exception, IOException {
-    SslContext nettySslContext = X509Util.createSslContextForServer(conf);
+    SslContext nettySslContext = getSslContext();
 
     if (supportPlaintext) {
       p.addLast("ssl", new OptionalSslHandler(nettySslContext));
@@ -258,4 +271,22 @@ public class NettyRpcServer extends RpcServer {
       LOG.debug("SSL handler added for channel: {}", p.channel());
     }
   }
+
+  SslContext getSslContext() throws X509Exception, IOException {
+    SslContext result = sslContextForServer.get();
+    if (result == null) {
+      result = X509Util.createSslContextForServer(conf);
+      if (!sslContextForServer.compareAndSet(null, result)) {
+        // lost the race, another thread already set the value
+        result = sslContextForServer.get();
+      } else if (
+        keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
+          && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
+      ) {
+        X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
+          () -> sslContextForServer.set(null));
+      }
+    }
+    return result;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java
new file mode 100644
index 00000000000..34c812a3bb9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
+import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
+import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
+import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@RunWith(Parameterized.class)
+@Category({ RPCTests.class, MediumTests.class })
+public class TestNettyTLSIPCFileWatcher {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestNettyTLSIPCFileWatcher.class);
+
+  private static final Configuration CONF = HBaseConfiguration.create();
+  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(CONF);
+  private static HRegionServer SERVER;
+  private static X509TestContextProvider PROVIDER;
+  private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
+
+  private X509TestContext x509TestContext;
+
+  @Parameterized.Parameter(0)
+  public X509KeyType keyType;
+
+  @Parameterized.Parameter(1)
+  public KeyStoreFileType storeFileType;
+
+  @Parameterized.Parameters(name = "{index}: keyType={0}, storeFileType={1}")
+  public static List<Object[]> data() {
+    List<Object[]> params = new ArrayList<>();
+    for (X509KeyType caKeyType : X509KeyType.values()) {
+      for (KeyStoreFileType ks : KeyStoreFileType.values()) {
+        params.add(new Object[] { caKeyType, ks });
+      }
+    }
+    return params;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    Security.addProvider(new BouncyCastleProvider());
+    File dir = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
+      .getCanonicalFile();
+    FileUtils.forceMkdir(dir);
+    // server must enable tls
+    CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
+    PROVIDER = new X509TestContextProvider(CONF, dir);
+    EVENT_LOOP_GROUP_CONFIG =
+      new NettyEventLoopGroupConfig(CONF, TestNettyTLSIPCFileWatcher.class.getSimpleName());
+    SERVER = mock(HRegionServer.class);
+    when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws InterruptedException {
+    Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+    EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync();
+    UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    x509TestContext = PROVIDER.get(keyType, keyType, "keyPa$$word".toCharArray());
+    x509TestContext.setConfigurations(storeFileType, storeFileType);
+    CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
+    CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true);
+    CONF.setBoolean(X509Util.TLS_CERT_RELOAD, true);
+  }
+
+  @After
+  public void tearDown() {
+    x509TestContext.clearConfigurations();
+    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
+    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
+    x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
+    System.clearProperty("com.sun.net.ssl.checkRevocation");
+    System.clearProperty("com.sun.security.enableCRLDP");
+    Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
+    Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
+  }
+
+  @Test
+  public void testReplaceServerKeystore()
+    throws IOException, ServiceException, GeneralSecurityException, OperatorCreationException {
+    Configuration clientConf = new Configuration(CONF);
+    RpcServer rpcServer = createRpcServer("testRpcServer",
+      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
+
+    try {
+      rpcServer.start();
+
+      try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
+        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+          newBlockingStub(client, rpcServer.getListenerAddress());
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        String message = "hello";
+        assertEquals(message,
+          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+            .getMessage());
+        assertNull(pcrc.cellScanner());
+      }
+
+      // Replace keystore
+      x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType);
+
+      try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
+        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+          newBlockingStub(client, rpcServer.getListenerAddress());
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        String message = "hello";
+        assertEquals(message,
+          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+            .getMessage());
+        assertNull(pcrc.cellScanner());
+      }
+
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testReplaceClientAndServerKeystore()
+    throws GeneralSecurityException, IOException, OperatorCreationException, ServiceException {
+    Configuration clientConf = new Configuration(CONF);
+    RpcServer rpcServer = createRpcServer("testRpcServer",
+      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
+
+    try {
+      rpcServer.start();
+
+      try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
+        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+          newBlockingStub(client, rpcServer.getListenerAddress());
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        String message = "hello";
+        assertEquals(message,
+          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+            .getMessage());
+        assertNull(pcrc.cellScanner());
+
+        // Replace keystore and cancel client connections
+        x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType);
+        client.cancelConnections(
+          ServerName.valueOf(Address.fromSocketAddress(rpcServer.getListenerAddress()), 0L));
+
+        assertEquals(message,
+          stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+            .getMessage());
+        assertNull(pcrc.cellScanner());
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  private RpcServer createRpcServer(String name,
+    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
+    Configuration conf, RpcScheduler scheduler) throws IOException {
+    return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true);
+  }
+}