You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2022/11/11 11:22:24 UTC
[hbase] branch master updated: HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (#4869)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 047f4e22e02 HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (#4869)
047f4e22e02 is described below
commit 047f4e22e0271ab0dfbf6e7843ae37c008c57c69
Author: Andor Molnár <an...@cloudera.com>
AuthorDate: Fri Nov 11 12:22:14 2022 +0100
HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (#4869)
Signed-off-by: wchevreuil@apache.org
---
.../apache/hadoop/hbase/ipc/NettyRpcClient.java | 17 ++
.../apache/hadoop/hbase/io/FileChangeWatcher.java | 251 ++++++++++++++++++
.../hadoop/hbase/io/crypto/tls/X509Util.java | 79 +++++-
.../hadoop/hbase/io/TestFileChangeWatcher.java | 279 +++++++++++++++++++++
.../hbase/io/crypto/tls/X509TestContext.java | 242 +++++++++++-------
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 33 ++-
.../hbase/security/TestNettyTLSIPCFileWatcher.java | 234 +++++++++++++++++
7 files changed, 1046 insertions(+), 89 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index 23eaa5a649f..231caa40a89 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.FileChangeWatcher;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -49,6 +50,8 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
private final boolean shutdownGroupWhenClose;
private final AtomicReference<SslContext> sslContextForClient = new AtomicReference<>();
+ private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
+ private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics) {
@@ -85,6 +88,14 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
if (shutdownGroupWhenClose) {
group.shutdownGracefully();
}
+ FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
+ if (ks != null) {
+ ks.stop();
+ }
+ FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
+ if (ts != null) {
+ ts.stop();
+ }
}
SslContext getSslContext() throws X509Exception, IOException {
@@ -94,6 +105,12 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
if (!sslContextForClient.compareAndSet(null, result)) {
// lost the race, another thread already set the value
result = sslContextForClient.get();
+ } else if (
+ keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
+ && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
+ ) {
+ X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
+ () -> sslContextForClient.set(null));
}
}
return result;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
new file mode 100644
index 00000000000..77e0e4e750c
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.function.Consumer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.server.ZooKeeperThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Instances of this class can be used to watch a directory for file changes. When a file is added
+ * to, deleted from, or is modified in the given directory, the callback provided by the user will
+ * be called from a background thread. Some things to keep in mind:
+ * <ul>
+ * <li>The callback should be thread-safe.</li>
+ * <li>Changes that happen around the time the thread is started may be missed.</li>
+ * <li>There is a delay between a file changing and the callback firing.</li>
+ * <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
+ * </ul>
+ * <p/>
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ * "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base
+ * revision</a>
+ */
+@InterfaceAudience.Private
+public final class FileChangeWatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);
+
+ public enum State {
+ NEW, // object created but start() not called yet
+ STARTING, // start() called but background thread has not entered main loop
+ RUNNING, // background thread is running
+ STOPPING, // stop() called but background thread has not exited main loop
+ STOPPED // stop() called and background thread has exited, or background thread crashed
+ }
+
+ private final WatcherThread watcherThread;
+ private State state; // protected by synchronized(this)
+
+ /**
+ * Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on
+ * changes.
+ * @param dirPath the directory to watch.
+ * @param callback the callback to invoke with events. <code>event.kind()</code> will return the
+ * type of event, and <code>event.context()</code> will return the filename
+ * relative to <code>dirPath</code>.
+ * @throws IOException if there is an error creating the WatchService.
+ */
+ public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
+ FileSystem fs = dirPath.getFileSystem();
+ WatchService watchService = fs.newWatchService();
+
+ LOG.debug("Registering with watch service: {}", dirPath);
+
+ dirPath.register(watchService,
+ new WatchEvent.Kind<?>[] { StandardWatchEventKinds.ENTRY_CREATE,
+ StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
+ StandardWatchEventKinds.OVERFLOW });
+ state = State.NEW;
+ this.watcherThread = new WatcherThread(watchService, callback);
+ this.watcherThread.setDaemon(true);
+ }
+
+ /**
+ * Returns the current {@link FileChangeWatcher.State}.
+ * @return the current state.
+ */
+ public synchronized State getState() {
+ return state;
+ }
+
+ /**
+ * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests,
+ * thus package-private.
+ * @param desiredState the desired state.
+ * @throws InterruptedException if the current thread gets interrupted.
+ */
+ synchronized void waitForState(State desiredState) throws InterruptedException {
+ while (this.state != desiredState) {
+ this.wait();
+ }
+ }
+
+ /**
+ * Sets the state to <code>newState</code>.
+ * @param newState the new state.
+ */
+ private synchronized void setState(State newState) {
+ state = newState;
+ this.notifyAll();
+ }
+
+ /**
+ * Atomically sets the state to <code>update</code> if and only if the state is currently
+ * <code>expected</code>.
+ * @param expected the expected state.
+ * @param update the new state.
+ * @return true if the update succeeds, or false if the current state does not equal
+ * <code>expected</code>.
+ */
+ private synchronized boolean compareAndSetState(State expected, State update) {
+ if (state == expected) {
+ setState(update);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Atomically sets the state to <code>update</code> if and only if the state is currently one of
+ * <code>expectedStates</code>.
+ * @param expectedStates the expected states.
+ * @param update the new state.
+ * @return true if the update succeeds, or false if the current state does not equal any of the
+ * <code>expectedStates</code>.
+ */
+ private synchronized boolean compareAndSetState(State[] expectedStates, State update) {
+ for (State expected : expectedStates) {
+ if (state == expected) {
+ setState(update);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Tells the background thread to start. Does not wait for it to be running. Calling this method
+ * more than once has no effect.
+ */
+ public void start() {
+ if (!compareAndSetState(State.NEW, State.STARTING)) {
+ // If previous state was not NEW, start() has already been called.
+ return;
+ }
+ this.watcherThread.start();
+ }
+
+ /**
+ * Tells the background thread to stop. Does not wait for it to exit.
+ */
+ public void stop() {
+ if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) {
+ watcherThread.interrupt();
+ }
+ }
+
+ /**
+ * Inner class that implements the watcher thread logic.
+ */
+ private class WatcherThread extends ZooKeeperThread {
+
+ private static final String THREAD_NAME = "FileChangeWatcher";
+
+ final WatchService watchService;
+ final Consumer<WatchEvent<?>> callback;
+
+ WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
+ super(THREAD_NAME);
+ this.watchService = watchService;
+ this.callback = callback;
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("{} thread started", getName());
+ if (
+ !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING)
+ ) {
+ // stop() called shortly after start(), before
+ // this thread started running.
+ FileChangeWatcher.State state = FileChangeWatcher.this.getState();
+ if (state != FileChangeWatcher.State.STOPPING) {
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ return;
+ }
+ runLoop();
+ } catch (Exception e) {
+ LOG.warn("Error in runLoop()", e);
+ throw e;
+ } finally {
+ try {
+ watchService.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing watch service", e);
+ }
+ LOG.info("{} thread finished", getName());
+ FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED);
+ }
+ }
+
+ private void runLoop() {
+ while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) {
+ WatchKey key;
+ try {
+ key = watchService.take();
+ } catch (InterruptedException | ClosedWatchServiceException e) {
+ LOG.debug("{} was interrupted and is shutting down...", getName());
+ break;
+ }
+ for (WatchEvent<?> event : key.pollEvents()) {
+ LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context());
+ try {
+ callback.accept(event);
+ } catch (Throwable e) {
+ LOG.error("Error from callback", e);
+ }
+ }
+ boolean isKeyValid = key.reset();
+ if (!isKeyValid) {
+ // This is likely a problem, it means that file reloading is broken, probably because the
+ // directory we are watching was deleted or otherwise became inaccessible (unmounted,
+ // permissions
+ // changed, ???).
+ // For now, we log an error and exit the watcher thread.
+ LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
index 96aa66364be..f120b457b5a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.hbase.io.crypto.tls;
import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.Security;
@@ -25,6 +29,7 @@ import java.security.cert.PKIXBuilderParameters;
import java.security.cert.X509CertSelector;
import java.util.Arrays;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.CertPathTrustManagerParameters;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
@@ -38,6 +43,7 @@ import org.apache.hadoop.hbase.exceptions.KeyManagerException;
import org.apache.hadoop.hbase.exceptions.SSLContextException;
import org.apache.hadoop.hbase.exceptions.TrustManagerException;
import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.FileChangeWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +85,7 @@ public final class X509Util {
CONFIG_PREFIX + "host-verification.reverse-dns.enabled";
private static final String TLS_ENABLED_PROTOCOLS = CONFIG_PREFIX + "enabledProtocols";
private static final String TLS_CIPHER_SUITES = CONFIG_PREFIX + "ciphersuites";
+ public static final String TLS_CERT_RELOAD = CONFIG_PREFIX + "certReload";
public static final String DEFAULT_PROTOCOL = "TLSv1.2";
//
@@ -244,7 +251,6 @@ public final class X509Util {
}
SslContextBuilder sslContextBuilder;
-
sslContextBuilder = SslContextBuilder
.forServer(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType));
@@ -395,4 +401,75 @@ public final class X509Util {
return cipherSuitesInput.split(",");
}
}
+
+ /**
+ * Enable certificate file reloading by creating FileWatchers for keystore and truststore.
+ * AtomicReferences will be set with the new instances. resetContext - if not null - will be
+ * called when the file has been modified.
+ * @param keystoreWatcher Reference to keystoreFileWatcher.
+ * @param trustStoreWatcher Reference to truststoreFileWatcher.
+ * @param resetContext Callback for file changes.
+ */
+ public static void enableCertFileReloading(Configuration config,
+ AtomicReference<FileChangeWatcher> keystoreWatcher,
+ AtomicReference<FileChangeWatcher> trustStoreWatcher, Runnable resetContext)
+ throws IOException {
+ String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
+ keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext));
+ String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
+ trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext));
+ }
+
+ private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext)
+ throws IOException {
+ if (fileLocation == null || fileLocation.isEmpty() || resetContext == null) {
+ return null;
+ }
+ final Path filePath = Paths.get(fileLocation).toAbsolutePath();
+ Path parentPath = filePath.getParent();
+ if (parentPath == null) {
+ throw new IOException("Key/trust store path does not have a parent: " + filePath);
+ }
+ FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, watchEvent -> {
+ handleWatchEvent(filePath, watchEvent, resetContext);
+ });
+ fileChangeWatcher.start();
+ return fileChangeWatcher;
+ }
+
+ /**
+ * Handler for watch events that let us know a file we may care about has changed on disk.
+ * @param filePath the path to the file we are watching for changes.
+ * @param event the WatchEvent.
+ */
+ private static void handleWatchEvent(Path filePath, WatchEvent<?> event, Runnable resetContext) {
+ boolean shouldResetContext = false;
+ Path dirPath = filePath.getParent();
+ if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) {
+ // If we get notified about possibly missed events, reload the key store / trust store just to
+ // be sure.
+ shouldResetContext = true;
+ } else if (
+ event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY)
+ || event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)
+ ) {
+ Path eventFilePath = dirPath.resolve((Path) event.context());
+ if (filePath.equals(eventFilePath)) {
+ shouldResetContext = true;
+ }
+ }
+ // Note: we don't care about delete events
+ if (shouldResetContext) {
+ LOG.info(
+ "Attempting to reset default SSL context after receiving watch event: {} with context: {}",
+ event.kind(), event.context());
+ resetContext.run();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Ignoring watch event and keeping previous default SSL context. Event kind: {} with context: {}",
+ event.kind(), event.context());
+ }
+ }
+ }
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
new file mode 100644
index 00000000000..cee36859629
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This file has been copied from the Apache ZooKeeper project.
+ * @see <a href=
+ * "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
+ * revision</a>
+ */
+@Category({ IOTests.class, SmallTests.class })
+public class TestFileChangeWatcher {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestFileChangeWatcher.class);
+
+ private static File tempDir;
+ private static File tempFile;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class);
+ private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
+
+ private static final long FS_TIMEOUT = 30000L;
+
+ @BeforeClass
+ public static void createTempFile() throws IOException {
+ tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString())
+ .getCanonicalFile();
+ FileUtils.forceMkdir(tempDir);
+ tempFile = File.createTempFile("zk_test_", "", tempDir);
+ }
+
+ @AfterClass
+ public static void cleanupTempDir() {
+ UTIL.cleanupTestDir();
+ }
+
+ @Test
+ public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
+ FileChangeWatcher watcher = null;
+ try {
+ final List<WatchEvent<?>> events = new ArrayList<>();
+ watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ LOG.info("Got an update: {} {}", event.kind(), event.context());
+ // Filter out the extra ENTRY_CREATE events that are
+ // sometimes seen at the start. Even though we create the watcher
+ // after the file exists, sometimes we still get a create event.
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+ return;
+ }
+ synchronized (events) {
+ events.add(event);
+ events.notifyAll();
+ }
+ });
+ watcher.start();
+ watcher.waitForState(FileChangeWatcher.State.RUNNING);
+ Thread.sleep(1000L); // TODO hack
+ for (int i = 0; i < 3; i++) {
+ LOG.info("Modifying file, attempt {}", (i + 1));
+ FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8,
+ true);
+ synchronized (events) {
+ if (events.size() < i + 1) {
+ events.wait(FS_TIMEOUT);
+ }
+ assertEquals("Wrong number of events", i + 1, events.size());
+ WatchEvent<?> event = events.get(i);
+ assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
+ assertEquals(tempFile.getName(), event.context().toString());
+ }
+ }
+ } finally {
+ if (watcher != null) {
+ watcher.stop();
+ watcher.waitForState(FileChangeWatcher.State.STOPPED);
+ }
+ }
+ }
+
+ @Test
+ public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException {
+ FileChangeWatcher watcher = null;
+ try {
+ final List<WatchEvent<?>> events = new ArrayList<>();
+ watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ LOG.info("Got an update: {} {}", event.kind(), event.context());
+ // Filter out the extra ENTRY_CREATE events that are
+ // sometimes seen at the start. Even though we create the watcher
+ // after the file exists, sometimes we still get a create event.
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+ return;
+ }
+ synchronized (events) {
+ events.add(event);
+ events.notifyAll();
+ }
+ });
+ watcher.start();
+ watcher.waitForState(FileChangeWatcher.State.RUNNING);
+ Thread.sleep(1000L); // TODO hack
+ LOG.info("Touching file");
+ FileUtils.touch(tempFile);
+ synchronized (events) {
+ if (events.isEmpty()) {
+ events.wait(FS_TIMEOUT);
+ }
+ assertFalse(events.isEmpty());
+ WatchEvent<?> event = events.get(0);
+ assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
+ assertEquals(tempFile.getName(), event.context().toString());
+ }
+ } finally {
+ if (watcher != null) {
+ watcher.stop();
+ watcher.waitForState(FileChangeWatcher.State.STOPPED);
+ }
+ }
+ }
+
+ @Test
+ public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException {
+ FileChangeWatcher watcher = null;
+ try {
+ final List<WatchEvent<?>> events = new ArrayList<>();
+ watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ LOG.info("Got an update: {} {}", event.kind(), event.context());
+ synchronized (events) {
+ events.add(event);
+ events.notifyAll();
+ }
+ });
+ watcher.start();
+ watcher.waitForState(FileChangeWatcher.State.RUNNING);
+ Thread.sleep(1000L); // TODO hack
+ File tempFile2 = File.createTempFile("zk_test_", "", tempDir);
+ tempFile2.deleteOnExit();
+ synchronized (events) {
+ if (events.isEmpty()) {
+ events.wait(FS_TIMEOUT);
+ }
+ assertFalse(events.isEmpty());
+ WatchEvent<?> event = events.get(0);
+ assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind());
+ assertEquals(tempFile2.getName(), event.context().toString());
+ }
+ } finally {
+ if (watcher != null) {
+ watcher.stop();
+ watcher.waitForState(FileChangeWatcher.State.STOPPED);
+ }
+ }
+ }
+
+ @Test
+ public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException {
+ FileChangeWatcher watcher = null;
+ try {
+ final List<WatchEvent<?>> events = new ArrayList<>();
+ watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ LOG.info("Got an update: {} {}", event.kind(), event.context());
+ // Filter out the extra ENTRY_CREATE events that are
+ // sometimes seen at the start. Even though we create the watcher
+ // after the file exists, sometimes we still get a create event.
+ if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
+ return;
+ }
+ synchronized (events) {
+ events.add(event);
+ events.notifyAll();
+ }
+ });
+ watcher.start();
+ watcher.waitForState(FileChangeWatcher.State.RUNNING);
+ Thread.sleep(1000L); // TODO hack
+ tempFile.delete();
+ synchronized (events) {
+ if (events.isEmpty()) {
+ events.wait(FS_TIMEOUT);
+ }
+ assertFalse(events.isEmpty());
+ WatchEvent<?> event = events.get(0);
+ assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind());
+ assertEquals(tempFile.getName(), event.context().toString());
+ }
+ } finally {
+ if (watcher != null) {
+ watcher.stop();
+ watcher.waitForState(FileChangeWatcher.State.STOPPED);
+ }
+ }
+ }
+
+ @Test
+ public void testCallbackErrorDoesNotCrashWatcherThread()
+ throws IOException, InterruptedException {
+ FileChangeWatcher watcher = null;
+ try {
+ final AtomicInteger callCount = new AtomicInteger(0);
+ watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ LOG.info("Got an update: {} {}", event.kind(), event.context());
+ int oldValue;
+ synchronized (callCount) {
+ oldValue = callCount.getAndIncrement();
+ callCount.notifyAll();
+ }
+ if (oldValue == 0) {
+ throw new RuntimeException("This error should not crash the watcher thread");
+ }
+ });
+ watcher.start();
+ watcher.waitForState(FileChangeWatcher.State.RUNNING);
+ Thread.sleep(1000L); // TODO hack
+ LOG.info("Modifying file");
+ FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
+ synchronized (callCount) {
+ while (callCount.get() == 0) {
+ callCount.wait(FS_TIMEOUT);
+ }
+ }
+ LOG.info("Modifying file again");
+ FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
+ synchronized (callCount) {
+ if (callCount.get() == 1) {
+ callCount.wait(FS_TIMEOUT);
+ }
+ }
+ // The value of callCount can exceed 1 only if the callback thread
+ // survives the exception thrown by the first callback.
+ assertTrue(callCount.get() > 1);
+ } finally {
+ if (watcher != null) {
+ watcher.stop();
+ watcher.waitForState(FileChangeWatcher.State.STOPPED);
+ }
+ }
+ }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
index 0185ebff0ec..ad4ffe0ab5a 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java
@@ -57,16 +57,16 @@ public final class X509TestContext {
private final File tempDir;
private final Configuration conf;
- private final X509Certificate trustStoreCertificate;
+ private X509Certificate trustStoreCertificate;
private final char[] trustStorePassword;
- private final KeyPair trustStoreKeyPair;
+ private KeyPair trustStoreKeyPair;
private File trustStoreJksFile;
private File trustStorePemFile;
private File trustStorePkcs12File;
private File trustStoreBcfksFile;
- private final KeyPair keyStoreKeyPair;
- private final X509Certificate keyStoreCertificate;
+ private KeyPair keyStoreKeyPair;
+ private X509Certificate keyStoreCertificate;
private final char[] keyStorePassword;
private File keyStoreJksFile;
private File keyStorePemFile;
@@ -101,16 +101,8 @@ public final class X509TestContext {
this.keyStoreKeyPair = requireNonNull(keyStoreKeyPair);
this.keyStorePassword = requireNonNull(keyStorePassword);
- X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
- caNameBuilder.addRDN(BCStyle.CN,
- MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA");
- trustStoreCertificate =
- X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair);
+ createCertificates();
- X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
- nameBuilder.addRDN(BCStyle.CN,
- MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test");
- keyStoreCertificate = newCert(nameBuilder.build());
trustStorePkcs12File = null;
trustStorePemFile = null;
trustStoreJksFile = null;
@@ -197,74 +189,85 @@ public final class X509TestContext {
private File getTrustStoreJksFile() throws IOException {
if (trustStoreJksFile == null) {
- File trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX,
+ trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX,
KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
trustStoreJksFile.deleteOnExit();
- try (
- final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) {
- byte[] bytes =
- X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword);
- trustStoreOutputStream.write(bytes);
- trustStoreOutputStream.flush();
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
- }
- this.trustStoreJksFile = trustStoreJksFile;
+ generateTrustStoreJksFile();
}
return trustStoreJksFile;
}
+ private void generateTrustStoreJksFile() throws IOException {
+ try (final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) {
+ byte[] bytes =
+ X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword);
+ trustStoreOutputStream.write(bytes);
+ trustStoreOutputStream.flush();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
private File getTrustStorePemFile() throws IOException {
if (trustStorePemFile == null) {
- File trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX,
+ trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX,
KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
trustStorePemFile.deleteOnExit();
- FileUtils.writeStringToFile(trustStorePemFile,
- X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII,
- false);
- this.trustStorePemFile = trustStorePemFile;
+ generateTrustStorePemFile();
}
return trustStorePemFile;
}
+ private void generateTrustStorePemFile() throws IOException {
+ FileUtils.writeStringToFile(trustStorePemFile,
+ X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII,
+ false);
+ }
+
private File getTrustStorePkcs12File() throws IOException {
if (trustStorePkcs12File == null) {
- File trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX,
+ trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX,
KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
trustStorePkcs12File.deleteOnExit();
- try (final FileOutputStream trustStoreOutputStream =
- new FileOutputStream(trustStorePkcs12File)) {
- byte[] bytes =
- X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword);
- trustStoreOutputStream.write(bytes);
- trustStoreOutputStream.flush();
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
- }
- this.trustStorePkcs12File = trustStorePkcs12File;
+ generateTrustStorePkcs12File();
}
return trustStorePkcs12File;
}
+ private void generateTrustStorePkcs12File() throws IOException {
+ try (
+ final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStorePkcs12File)) {
+ byte[] bytes =
+ X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword);
+ trustStoreOutputStream.write(bytes);
+ trustStoreOutputStream.flush();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
private File getTrustStoreBcfksFile() throws IOException {
if (trustStoreBcfksFile == null) {
- File trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX,
+ trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX,
KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir);
trustStoreBcfksFile.deleteOnExit();
- try (
- final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) {
- byte[] bytes =
- X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword);
- trustStoreOutputStream.write(bytes);
- trustStoreOutputStream.flush();
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
- }
- this.trustStoreBcfksFile = trustStoreBcfksFile;
+ generateTrustStoreBcfksFile();
}
return trustStoreBcfksFile;
}
+ private void generateTrustStoreBcfksFile() throws IOException {
+ try (
+ final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) {
+ byte[] bytes =
+ X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword);
+ trustStoreOutputStream.write(bytes);
+ trustStoreOutputStream.flush();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
public X509Certificate getKeyStoreCertificate() {
return keyStoreCertificate;
}
@@ -307,33 +310,32 @@ public final class X509TestContext {
private File getKeyStoreJksFile() throws IOException {
if (keyStoreJksFile == null) {
- File keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX,
+ keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX,
KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir);
keyStoreJksFile.deleteOnExit();
- try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) {
- byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate,
- keyStoreKeyPair.getPrivate(), keyStorePassword);
- keyStoreOutputStream.write(bytes);
- keyStoreOutputStream.flush();
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
- }
- this.keyStoreJksFile = keyStoreJksFile;
+ generateKeyStoreJksFile();
}
return keyStoreJksFile;
}
+ private void generateKeyStoreJksFile() throws IOException {
+ try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) {
+ byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate,
+ keyStoreKeyPair.getPrivate(), keyStorePassword);
+ keyStoreOutputStream.write(bytes);
+ keyStoreOutputStream.flush();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
private File getKeyStorePemFile() throws IOException {
if (keyStorePemFile == null) {
try {
- File keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX,
+ keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX,
KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir);
keyStorePemFile.deleteOnExit();
- FileUtils.writeStringToFile(keyStorePemFile,
- X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate,
- keyStoreKeyPair.getPrivate(), keyStorePassword),
- StandardCharsets.US_ASCII, false);
- this.keyStorePemFile = keyStorePemFile;
+ generateKeyStorePemFile();
} catch (OperatorCreationException e) {
throw new IOException(e);
}
@@ -341,42 +343,55 @@ public final class X509TestContext {
return keyStorePemFile;
}
+ private void generateKeyStorePemFile() throws IOException, OperatorCreationException {
+ FileUtils.writeStringToFile(keyStorePemFile,
+ X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, keyStoreKeyPair.getPrivate(),
+ keyStorePassword),
+ StandardCharsets.US_ASCII, false);
+ }
+
private File getKeyStorePkcs12File() throws IOException {
if (keyStorePkcs12File == null) {
- File keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX,
+ keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX,
KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir);
keyStorePkcs12File.deleteOnExit();
- try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) {
- byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate,
- keyStoreKeyPair.getPrivate(), keyStorePassword);
- keyStoreOutputStream.write(bytes);
- keyStoreOutputStream.flush();
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
- }
- this.keyStorePkcs12File = keyStorePkcs12File;
+ generateKeyStorePkcs12File();
}
return keyStorePkcs12File;
}
+ private void generateKeyStorePkcs12File() throws IOException {
+ try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) {
+ byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate,
+ keyStoreKeyPair.getPrivate(), keyStorePassword);
+ keyStoreOutputStream.write(bytes);
+ keyStoreOutputStream.flush();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
private File getKeyStoreBcfksFile() throws IOException {
if (keyStoreBcfksFile == null) {
- File keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX,
+ keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX,
KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir);
keyStoreBcfksFile.deleteOnExit();
- try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) {
- byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate,
- keyStoreKeyPair.getPrivate(), keyStorePassword);
- keyStoreOutputStream.write(bytes);
- keyStoreOutputStream.flush();
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
- }
- this.keyStoreBcfksFile = keyStoreBcfksFile;
+ generateKeyStoreBcfksFile();
}
return keyStoreBcfksFile;
}
+ private void generateKeyStoreBcfksFile() throws IOException {
+ try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) {
+ byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate,
+ keyStoreKeyPair.getPrivate(), keyStorePassword);
+ keyStoreOutputStream.write(bytes);
+ keyStoreOutputStream.flush();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
/**
* Sets the SSL system properties such that the given X509Util object can be used to create SSL
* Contexts that will use the trust store and key store files created by this test context.
@@ -445,6 +460,59 @@ public final class X509TestContext {
keyStoreKeyPair, keyStorePassword, cert);
}
+ public void regenerateStores(X509KeyType keyStoreKeyType, X509KeyType trustStoreKeyType,
+ KeyStoreFileType keyStoreFileType, KeyStoreFileType trustStoreFileType)
+ throws GeneralSecurityException, IOException, OperatorCreationException {
+
+ trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType);
+ keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType);
+ createCertificates();
+
+ switch (keyStoreFileType) {
+ case JKS:
+ generateKeyStoreJksFile();
+ break;
+ case PEM:
+ generateKeyStorePemFile();
+ break;
+ case BCFKS:
+ generateKeyStoreBcfksFile();
+ break;
+ case PKCS12:
+ generateKeyStorePkcs12File();
+ break;
+ }
+
+ switch (trustStoreFileType) {
+ case JKS:
+ generateTrustStoreJksFile();
+ break;
+ case PEM:
+ generateTrustStorePemFile();
+ break;
+ case PKCS12:
+ generateTrustStorePkcs12File();
+ break;
+ case BCFKS:
+ generateTrustStoreBcfksFile();
+ break;
+ }
+ }
+
+ private void createCertificates()
+ throws GeneralSecurityException, IOException, OperatorCreationException {
+ X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+ caNameBuilder.addRDN(BCStyle.CN,
+ MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA");
+ trustStoreCertificate =
+ X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair);
+
+ X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE);
+ nameBuilder.addRDN(BCStyle.CN,
+ MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test");
+ keyStoreCertificate = newCert(nameBuilder.build());
+ }
+
/**
* Builder class, used for creating new instances of X509TestContext.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 7de46bb4ea6..4b8aa28ad12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -25,11 +25,13 @@ import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HBaseServerBase;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.exceptions.X509Exception;
+import org.apache.hadoop.hbase.io.FileChangeWatcher;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
@@ -85,6 +87,9 @@ public class NettyRpcServer extends RpcServer {
private final Channel serverChannel;
final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
private final ByteBufAllocator channelAllocator;
+ private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>();
+ private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
+ private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
@@ -192,6 +197,14 @@ public class NettyRpcServer extends RpcServer {
return;
}
LOG.info("Stopping server on " + this.serverChannel.localAddress());
+ FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
+ if (ks != null) {
+ ks.stop();
+ }
+ FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
+ if (ts != null) {
+ ts.stop();
+ }
if (authTokenSecretMgr != null) {
authTokenSecretMgr.stop();
authTokenSecretMgr = null;
@@ -226,7 +239,7 @@ public class NettyRpcServer extends RpcServer {
private void initSSL(ChannelPipeline p, boolean supportPlaintext)
throws X509Exception, IOException {
- SslContext nettySslContext = X509Util.createSslContextForServer(conf);
+ SslContext nettySslContext = getSslContext();
if (supportPlaintext) {
p.addLast("ssl", new OptionalSslHandler(nettySslContext));
@@ -236,4 +249,22 @@ public class NettyRpcServer extends RpcServer {
LOG.debug("SSL handler added for channel: {}", p.channel());
}
}
+
+ SslContext getSslContext() throws X509Exception, IOException {
+ SslContext result = sslContextForServer.get();
+ if (result == null) {
+ result = X509Util.createSslContextForServer(conf);
+ if (!sslContextForServer.compareAndSet(null, result)) {
+ // lost the race, another thread already set the value
+ result = sslContextForServer.get();
+ } else if (
+ keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
+ && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
+ ) {
+ X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
+ () -> sslContextForServer.set(null));
+ }
+ }
+ return result;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java
new file mode 100644
index 00000000000..72fc7141680
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseServerBase;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType;
+import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext;
+import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
+import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
+import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@RunWith(Parameterized.class)
+@Category({ RPCTests.class, MediumTests.class })
+public class TestNettyTLSIPCFileWatcher {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestNettyTLSIPCFileWatcher.class);
+
+ private static final Configuration CONF = HBaseConfiguration.create();
+ private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(CONF);
+ private static HBaseServerBase<?> SERVER;
+ private static X509TestContextProvider PROVIDER;
+ private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
+
+ private X509TestContext x509TestContext;
+
+ @Parameterized.Parameter(0)
+ public X509KeyType keyType;
+
+ @Parameterized.Parameter(1)
+ public KeyStoreFileType storeFileType;
+
+ @Parameterized.Parameters(name = "{index}: keyType={0}, storeFileType={1}")
+ public static List<Object[]> data() {
+ List<Object[]> params = new ArrayList<>();
+ for (X509KeyType caKeyType : X509KeyType.values()) {
+ for (KeyStoreFileType ks : KeyStoreFileType.values()) {
+ params.add(new Object[] { caKeyType, ks });
+ }
+ }
+ return params;
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws IOException {
+ Security.addProvider(new BouncyCastleProvider());
+ File dir = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString())
+ .getCanonicalFile();
+ FileUtils.forceMkdir(dir);
+ // server must enable tls
+ CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true);
+ PROVIDER = new X509TestContextProvider(CONF, dir);
+ EVENT_LOOP_GROUP_CONFIG =
+ NettyEventLoopGroupConfig.setup(CONF, TestNettyTlsIPC.class.getSimpleName());
+ SERVER = mock(HBaseServerBase.class);
+ when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws InterruptedException {
+ Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
+ EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync();
+ UTIL.cleanupTestDir();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ x509TestContext = PROVIDER.get(keyType, keyType, "keyPa$$word".toCharArray());
+ x509TestContext.setConfigurations(storeFileType, storeFileType);
+ CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false);
+ CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true);
+ CONF.setBoolean(X509Util.TLS_CERT_RELOAD, true);
+ }
+
+ @After
+ public void tearDown() {
+ x509TestContext.clearConfigurations();
+ x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP);
+ x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR);
+ x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL);
+ System.clearProperty("com.sun.net.ssl.checkRevocation");
+ System.clearProperty("com.sun.security.enableCRLDP");
+ Security.setProperty("ocsp.enable", Boolean.FALSE.toString());
+ Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString());
+ }
+
+ @Test
+ public void testReplaceServerKeystore()
+ throws IOException, ServiceException, GeneralSecurityException, OperatorCreationException {
+ Configuration clientConf = new Configuration(CONF);
+ RpcServer rpcServer = createRpcServer("testRpcServer",
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+ new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
+
+ try {
+ rpcServer.start();
+
+ try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+ newBlockingStub(client, rpcServer.getListenerAddress());
+ HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+ String message = "hello";
+ assertEquals(message,
+ stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+ .getMessage());
+ assertNull(pcrc.cellScanner());
+ }
+
+ // Replace keystore
+ x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType);
+
+ try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+ newBlockingStub(client, rpcServer.getListenerAddress());
+ HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+ String message = "hello";
+ assertEquals(message,
+ stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+ .getMessage());
+ assertNull(pcrc.cellScanner());
+ }
+
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+ @Test
+ public void testReplaceClientAndServerKeystore()
+ throws GeneralSecurityException, IOException, OperatorCreationException, ServiceException {
+ Configuration clientConf = new Configuration(CONF);
+ RpcServer rpcServer = createRpcServer("testRpcServer",
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+ new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
+
+ try {
+ rpcServer.start();
+
+ try (AbstractRpcClient<?> client = new NettyRpcClient(clientConf)) {
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+ newBlockingStub(client, rpcServer.getListenerAddress());
+ HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+ String message = "hello";
+ assertEquals(message,
+ stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+ .getMessage());
+ assertNull(pcrc.cellScanner());
+
+ // Replace keystore and cancel client connections
+ x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType);
+ client.cancelConnections(
+ ServerName.valueOf(Address.fromSocketAddress(rpcServer.getListenerAddress()), 0L));
+
+ assertEquals(message,
+ stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+ .getMessage());
+ assertNull(pcrc.cellScanner());
+ }
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+ private RpcServer createRpcServer(String name,
+ List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
+ Configuration conf, RpcScheduler scheduler) throws IOException {
+ return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true);
+ }
+}