You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/03/31 17:46:53 UTC

[hadoop] branch trunk updated (a2975d2 -> 2c482fb)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from a2975d2  HADOOP-16524. Automatic keystore reloading for HttpServer2 Reapply of issue reverted first because it caused yarn failures and then again because the commit message was incorrectly formatted.
     new 22961a6  Revert "HADOOP-16524. Automatic keystore reloading for HttpServer2"
     new 2c482fb  HADOOP-16524. Automatic keystore reloading for HttpServer2

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: Revert "HADOOP-16524. Automatic keystore reloading for HttpServer2"

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 22961a615df5d4764e36e8d0d089ece9b2ecbab5
Author: stack <st...@apache.org>
AuthorDate: Wed Mar 31 10:43:09 2021 -0700

    Revert "HADOOP-16524. Automatic keystore reloading for HttpServer2"
    
    This reverts commit a2975d215371bc693363683338bfa42a0cd5b6d9.
---
 .../java/org/apache/hadoop/http/HttpServer2.java   |  62 +-----
 .../security/ssl/FileBasedKeyStoresFactory.java    | 222 ++++++++-------------
 .../security/ssl/FileMonitoringTimerTask.java      |  85 --------
 .../security/ssl/ReloadingX509KeystoreManager.java | 157 ---------------
 .../security/ssl/ReloadingX509TrustManager.java    |  95 +++++++--
 .../security/ssl/TestReloadingX509KeyManager.java  | 205 -------------------
 .../ssl/TestReloadingX509TrustManager.java         |  77 +++----
 .../hadoop/hdfs/web/TestURLConnectionFactory.java  |   3 +-
 .../yarn/client/api/impl/TestTimelineClient.java   |   3 +-
 hadoop-yarn-project/hadoop-yarn/pom.xml            |   3 +-
 10 files changed, 197 insertions(+), 715 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
index dde27db..7534cba 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
@@ -27,17 +27,14 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
-import java.nio.file.Paths;
-import java.util.List;
+import java.util.Arrays;
 import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Enumeration;
-import java.util.Arrays;
-import java.util.Timer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -78,8 +75,6 @@ import org.apache.hadoop.security.authentication.server.ProxyUserAuthenticationF
 import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
 import org.apache.hadoop.security.authentication.util.SignerSecretProvider;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
-import org.apache.hadoop.security.ssl.FileMonitoringTimerTask;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
@@ -191,7 +186,6 @@ public final class HttpServer2 implements FilterContainer {
   static final String STATE_DESCRIPTION_ALIVE = " - alive";
   static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
   private final SignerSecretProvider secretProvider;
-  private final Optional<java.util.Timer> configurationChangeMonitor;
   private XFrameOption xFrameOption;
   private boolean xFrameOptionIsEnabled;
   public static final String HTTP_HEADER_PREFIX = "hadoop.http.header.";
@@ -250,8 +244,6 @@ public final class HttpServer2 implements FilterContainer {
 
     private boolean sniHostCheckEnabled;
 
-    private Optional<Timer> configurationChangeMonitor = Optional.empty();
-
     public Builder setName(String name){
       this.name = name;
       return this;
@@ -582,45 +574,12 @@ public final class HttpServer2 implements FilterContainer {
       }
 
       setEnabledProtocols(sslContextFactory);
-
-      long storesReloadInterval =
-          conf.getLong(FileBasedKeyStoresFactory.SSL_STORES_RELOAD_INTERVAL_TPL_KEY,
-              FileBasedKeyStoresFactory.DEFAULT_SSL_STORES_RELOAD_INTERVAL);
-
-      if (storesReloadInterval > 0) {
-        this.configurationChangeMonitor = Optional.of(
-            this.makeConfigurationChangeMonitor(storesReloadInterval, sslContextFactory));
-      }
-
       conn.addFirstConnectionFactory(new SslConnectionFactory(sslContextFactory,
           HttpVersion.HTTP_1_1.asString()));
 
       return conn;
     }
 
-    private Timer makeConfigurationChangeMonitor(long reloadInterval,
-        SslContextFactory.Server sslContextFactory) {
-      java.util.Timer timer = new java.util.Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
-      //
-      // The Jetty SSLContextFactory provides a 'reload' method which will reload both
-      // truststore and keystore certificates.
-      //
-      timer.schedule(new FileMonitoringTimerTask(
-              Paths.get(keyStore),
-              path -> {
-                LOG.info("Reloading certificates from store keystore " + keyStore);
-                try {
-                  sslContextFactory.reload(factory -> { });
-                } catch (Exception ex) {
-                  LOG.error("Failed to reload SSL keystore certificates", ex);
-                }
-              },null),
-          reloadInterval,
-          reloadInterval
-      );
-      return timer;
-    }
-
     private void setEnabledProtocols(SslContextFactory sslContextFactory) {
       String enabledProtocols = conf.get(SSLFactory.SSL_ENABLED_PROTOCOLS_KEY,
           SSLFactory.SSL_ENABLED_PROTOCOLS_DEFAULT);
@@ -663,7 +622,6 @@ public final class HttpServer2 implements FilterContainer {
     this.webAppContext = createWebAppContext(b, adminsAcl, appDir);
     this.xFrameOptionIsEnabled = b.xFrameEnabled;
     this.xFrameOption = b.xFrameOption;
-    this.configurationChangeMonitor = b.configurationChangeMonitor;
 
     try {
       this.secretProvider =
@@ -1462,16 +1420,6 @@ public final class HttpServer2 implements FilterContainer {
    */
   public void stop() throws Exception {
     MultiException exception = null;
-    if (this.configurationChangeMonitor.isPresent()) {
-      try {
-        this.configurationChangeMonitor.get().cancel();
-      } catch (Exception e) {
-        LOG.error(
-            "Error while canceling configuration monitoring timer for webapp"
-                + webAppContext.getDisplayName(), e);
-        exception = addMultiException(exception, e);
-      }
-    }
     for (ServerConnector c : listeners) {
       try {
         c.close();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
index 236d881..b184e4a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
@@ -29,20 +29,20 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManager;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.text.MessageFormat;
-import java.util.Timer;
 
 /**
  * {@link KeyStoresFactory} implementation that reads the certificates from
  * keystore files.
  * <p>
- * If either the truststore or the keystore certificates file changes, it
- * would be refreshed under the corresponding wrapper implementation -
- * {@link ReloadingX509KeystoreManager} or {@link ReloadingX509TrustManager}.
- * </p>
+ * if the trust certificates keystore file changes, the {@link TrustManager}
+ * is refreshed with the new trust certificate entries (using a
+ * {@link ReloadingX509TrustManager} trustmanager).
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -51,19 +51,6 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   private static final Logger LOG =
       LoggerFactory.getLogger(FileBasedKeyStoresFactory.class);
 
-
-  /**
-   * The name of the timer thread monitoring file changes.
-   */
-  public static final String SSL_MONITORING_THREAD_NAME = "SSL Certificates Store Monitor";
-
-  /**
-   * The refresh interval used to check if either of the truststore or keystore
-   * certificate file has changed.
-   */
-  public static final String SSL_STORES_RELOAD_INTERVAL_TPL_KEY =
-    "ssl.{0}.stores.reload.interval";
-
   public static final String SSL_KEYSTORE_LOCATION_TPL_KEY =
     "ssl.{0}.keystore.location";
   public static final String SSL_KEYSTORE_PASSWORD_TPL_KEY =
@@ -90,119 +77,14 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   public static final String DEFAULT_KEYSTORE_TYPE = "jks";
 
   /**
-   * The default time interval in milliseconds used to check if either
-   * of the truststore or keystore certificates file has changed and needs reloading.
+   * Reload interval in milliseconds.
    */
-  public static final int DEFAULT_SSL_STORES_RELOAD_INTERVAL = 10000;
+  public static final int DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL = 10000;
 
   private Configuration conf;
   private KeyManager[] keyManagers;
   private TrustManager[] trustManagers;
   private ReloadingX509TrustManager trustManager;
-  private Timer fileMonitoringTimer;
-
-
-  private void createTrustManagersFromConfiguration(SSLFactory.Mode mode,
-                                                    String truststoreType,
-                                                    String truststoreLocation,
-                                                    long storesReloadInterval)
-      throws IOException, GeneralSecurityException {
-    String passwordProperty = resolvePropertyName(mode,
-        SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
-    String truststorePassword = getPassword(conf, passwordProperty, "");
-    if (truststorePassword.isEmpty()) {
-      // An empty trust store password is legal; the trust store password
-      // is only required when writing to a trust store. Otherwise it's
-      // an optional integrity check.
-      truststorePassword = null;
-    }
-
-    // Check if obsolete truststore specific reload interval is present for backward compatible
-    long truststoreReloadInterval =
-        conf.getLong(
-            resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
-            storesReloadInterval);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation +
-          ", reloading at " + truststoreReloadInterval + " millis.");
-    }
-
-    trustManager = new ReloadingX509TrustManager(
-        truststoreType,
-        truststoreLocation,
-        truststorePassword);
-
-    if (truststoreReloadInterval > 0) {
-      fileMonitoringTimer.schedule(
-          new FileMonitoringTimerTask(
-              Paths.get(truststoreLocation),
-              path -> trustManager.loadFrom(path),
-              exception -> LOG.error(ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE, exception)),
-          truststoreReloadInterval,
-          truststoreReloadInterval);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
-    }
-    trustManagers = new TrustManager[]{trustManager};
-  }
-
-  /**
-   * Implements logic of initializing the KeyManagers with the options
-   * to reload keystores.
-   * @param mode client or server
-   * @param keystoreType The keystore type.
-   * @param storesReloadInterval The interval to check if the keystore certificates
-   *                             file has changed.
-   */
-  private void createKeyManagersFromConfiguration(SSLFactory.Mode mode,
-                                                  String keystoreType, long storesReloadInterval)
-      throws GeneralSecurityException, IOException {
-    String locationProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
-    String keystoreLocation = conf.get(locationProperty, "");
-    if (keystoreLocation.isEmpty()) {
-      throw new GeneralSecurityException("The property '" + locationProperty +
-          "' has not been set in the ssl configuration file.");
-    }
-    String passwordProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
-    String keystorePassword = getPassword(conf, passwordProperty, "");
-    if (keystorePassword.isEmpty()) {
-      throw new GeneralSecurityException("The property '" + passwordProperty +
-          "' has not been set in the ssl configuration file.");
-    }
-    String keyPasswordProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
-    // Key password defaults to the same value as store password for
-    // compatibility with legacy configurations that did not use a separate
-    // configuration property for key password.
-    String keystoreKeyPassword = getPassword(
-        conf, keyPasswordProperty, keystorePassword);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
-    }
-
-    ReloadingX509KeystoreManager keystoreManager =  new ReloadingX509KeystoreManager(
-        keystoreType,
-        keystoreLocation,
-        keystorePassword,
-        keystoreKeyPassword);
-
-    if (storesReloadInterval > 0) {
-      fileMonitoringTimer.schedule(
-          new FileMonitoringTimerTask(
-              Paths.get(keystoreLocation),
-              path -> keystoreManager.loadFrom(path),
-              exception -> LOG.error(ReloadingX509KeystoreManager.RELOAD_ERROR_MESSAGE, exception)),
-          storesReloadInterval,
-          storesReloadInterval);
-    }
-
-    keyManagers = new KeyManager[] { keystoreManager };
-  }
 
   /**
    * Resolves a property name to its client/server version if applicable.
@@ -257,28 +139,56 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
       conf.getBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY,
           SSLFactory.SSL_REQUIRE_CLIENT_CERT_DEFAULT);
 
-    long storesReloadInterval = conf.getLong(
-        resolvePropertyName(mode, SSL_STORES_RELOAD_INTERVAL_TPL_KEY),
-        DEFAULT_SSL_STORES_RELOAD_INTERVAL);
-
-    fileMonitoringTimer = new Timer(SSL_MONITORING_THREAD_NAME, true);
-
     // certificate store
     String keystoreType =
-        conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
-                 DEFAULT_KEYSTORE_TYPE);
-
+      conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
+               DEFAULT_KEYSTORE_TYPE);
+    KeyStore keystore = KeyStore.getInstance(keystoreType);
+    String keystoreKeyPassword = null;
     if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
-      createKeyManagersFromConfiguration(mode, keystoreType, storesReloadInterval);
+      String locationProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
+      String keystoreLocation = conf.get(locationProperty, "");
+      if (keystoreLocation.isEmpty()) {
+        throw new GeneralSecurityException("The property '" + locationProperty +
+          "' has not been set in the ssl configuration file.");
+      }
+      String passwordProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
+      String keystorePassword = getPassword(conf, passwordProperty, "");
+      if (keystorePassword.isEmpty()) {
+        throw new GeneralSecurityException("The property '" + passwordProperty +
+          "' has not been set in the ssl configuration file.");
+      }
+      String keyPasswordProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
+      // Key password defaults to the same value as store password for
+      // compatibility with legacy configurations that did not use a separate
+      // configuration property for key password.
+      keystoreKeyPassword = getPassword(
+          conf, keyPasswordProperty, keystorePassword);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
+      }
+
+      InputStream is = Files.newInputStream(Paths.get(keystoreLocation));
+      try {
+        keystore.load(is, keystorePassword.toCharArray());
+      } finally {
+        is.close();
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(mode.toString() + " Loaded KeyStore: " + keystoreLocation);
+      }
     } else {
-      KeyStore keystore = KeyStore.getInstance(keystoreType);
       keystore.load(null, null);
-      KeyManagerFactory keyMgrFactory = KeyManagerFactory
-              .getInstance(SSLFactory.SSLCERTIFICATE);
-
-      keyMgrFactory.init(keystore, null);
-      keyManagers = keyMgrFactory.getKeyManagers();
     }
+    KeyManagerFactory keyMgrFactory = KeyManagerFactory
+        .getInstance(SSLFactory.SSLCERTIFICATE);
+      
+    keyMgrFactory.init(keystore, (keystoreKeyPassword != null) ?
+                                 keystoreKeyPassword.toCharArray() : null);
+    keyManagers = keyMgrFactory.getKeyManagers();
 
     //trust store
     String truststoreType =
@@ -289,7 +199,33 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
       resolvePropertyName(mode, SSL_TRUSTSTORE_LOCATION_TPL_KEY);
     String truststoreLocation = conf.get(locationProperty, "");
     if (!truststoreLocation.isEmpty()) {
-      createTrustManagersFromConfiguration(mode, truststoreType, truststoreLocation, storesReloadInterval);
+      String passwordProperty = resolvePropertyName(mode,
+          SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
+      String truststorePassword = getPassword(conf, passwordProperty, "");
+      if (truststorePassword.isEmpty()) {
+        // An empty trust store password is legal; the trust store password
+        // is only required when writing to a trust store. Otherwise it's
+        // an optional integrity check.
+        truststorePassword = null;
+      }
+      long truststoreReloadInterval =
+          conf.getLong(
+              resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
+              DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation);
+      }
+
+      trustManager = new ReloadingX509TrustManager(truststoreType,
+          truststoreLocation,
+          truststorePassword,
+          truststoreReloadInterval);
+      trustManager.init();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
+      }
+      trustManagers = new TrustManager[]{trustManager};
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("The property '" + locationProperty + "' has not been set, " +
@@ -320,7 +256,7 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   @Override
   public synchronized void destroy() {
     if (trustManager != null) {
-      fileMonitoringTimer.cancel();
+      trustManager.destroy();
       trustManager = null;
       keyManagers = null;
       trustManagers = null;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java
deleted file mode 100644
index 40b6197..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.security.ssl;
-
-import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.file.Path;
-import java.util.TimerTask;
-import java.util.function.Consumer;
-
-/**
- * Implements basic logic to track when a file changes on disk and call the action
- * passed to the constructor when it does. An exception handler can optionally also be specified
- * in the constructor, otherwise any exception occurring during process will be logged
- * using this class' logger.
- */
-@InterfaceAudience.Private
-public class FileMonitoringTimerTask extends TimerTask {
-
-  static final Logger LOG = LoggerFactory.getLogger(FileMonitoringTimerTask.class);
-
-  @VisibleForTesting
-  static final String PROCESS_ERROR_MESSAGE =
-      "Could not process file change : ";
-
-  final private Path filePath;
-  final private Consumer<Path> onFileChange;
-  final Consumer<Throwable> onChangeFailure;
-  private long lastProcessed;
-
-  /**
-   * Create file monitoring task to be scheduled using a standard Java {@link java.util.Timer}
-   * instance.
-   *
-   * @param filePath The path to the file to monitor.
-   * @param onFileChange The function to call when the file has changed.
-   * @param onChangeFailure The function to call when an exception is thrown during the
-   *                       file change processing.
-   */
-  public FileMonitoringTimerTask(Path filePath, Consumer<Path> onFileChange,
-      Consumer<Throwable> onChangeFailure) {
-    Preconditions.checkNotNull(filePath, "path to monitor disk file is not set");
-    Preconditions.checkNotNull(onFileChange, "action to monitor disk file is not set");
-
-    this.filePath = filePath;
-    this.lastProcessed = filePath.toFile().lastModified();
-    this.onFileChange = onFileChange;
-    this.onChangeFailure = onChangeFailure;
-  }
-
-  @Override
-  public void run() {
-    if (lastProcessed != filePath.toFile().lastModified()) {
-      try {
-        onFileChange.accept(filePath);
-      } catch (Throwable t) {
-        if (onChangeFailure  != null) {
-          onChangeFailure.accept(t);
-        } else {
-          LOG.error(PROCESS_ERROR_MESSAGE + filePath.toString(), t);
-        }
-      }
-      lastProcessed = filePath.toFile().lastModified();
-    }
-  }
-}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java
deleted file mode 100644
index 72e8b6b..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.security.ssl;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.*;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-import java.security.Principal;
-import java.security.PrivateKey;
-import java.security.cert.X509Certificate;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * An implementation of <code>X509KeyManager</code> that exposes a method,
- * {@link #loadFrom(Path)} to reload its configuration. Note that it is necessary
- * to implement the <code>X509ExtendedKeyManager</code> to properly delegate
- * the additional methods, otherwise the SSL handshake will fail.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ReloadingX509KeystoreManager extends X509ExtendedKeyManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
-
-  static final String RELOAD_ERROR_MESSAGE =
-      "Could not load keystore (keep using existing one) : ";
-
-  final private String type;
-  final private String storePassword;
-  final private String keyPassword;
-  private AtomicReference<X509ExtendedKeyManager> keyManagerRef;
-
-  /**
-   * Construct a <code>Reloading509KeystoreManager</code>
-   *
-   * @param type type of keystore file, typically 'jks'.
-   * @param location local path to the keystore file.
-   * @param storePassword password of the keystore file.
-   * @param keyPassword The password of the key.
-   * @throws IOException
-   * @throws GeneralSecurityException
-   */
-  public ReloadingX509KeystoreManager(String type, String location,
-                                      String storePassword, String keyPassword)
-      throws IOException, GeneralSecurityException {
-    this.type = type;
-    this.storePassword = storePassword;
-    this.keyPassword = keyPassword;
-    keyManagerRef = new AtomicReference<X509ExtendedKeyManager>();
-    keyManagerRef.set(loadKeyManager(Paths.get(location)));
-  }
-
-  @Override
-  public String chooseEngineClientAlias(String[] strings, Principal[] principals,
-      SSLEngine sslEngine) {
-    return keyManagerRef.get().chooseEngineClientAlias(strings, principals, sslEngine);
-  }
-
-  @Override
-  public String chooseEngineServerAlias(String s, Principal[] principals,
-      SSLEngine sslEngine) {
-    return keyManagerRef.get().chooseEngineServerAlias(s, principals, sslEngine);
-  }
-
-  @Override
-  public String[] getClientAliases(String s, Principal[] principals) {
-    return keyManagerRef.get().getClientAliases(s, principals);
-  }
-
-  @Override
-  public String chooseClientAlias(String[] strings, Principal[] principals,
-      Socket socket) {
-    return keyManagerRef.get().chooseClientAlias(strings, principals, socket);
-  }
-
-  @Override
-  public String[] getServerAliases(String s, Principal[] principals) {
-    return keyManagerRef.get().getServerAliases(s, principals);
-  }
-
-  @Override
-  public String chooseServerAlias(String s, Principal[] principals,
-      Socket socket) {
-    return keyManagerRef.get().chooseServerAlias(s, principals, socket);
-  }
-
-  @Override
-  public X509Certificate[] getCertificateChain(String s) {
-    return keyManagerRef.get().getCertificateChain(s);
-  }
-
-  @Override
-  public PrivateKey getPrivateKey(String s) {
-    return keyManagerRef.get().getPrivateKey(s);
-  }
-
-  public ReloadingX509KeystoreManager loadFrom(Path path) {
-    try {
-      this.keyManagerRef.set(loadKeyManager(path));
-    } catch (Exception ex) {
-      // The Consumer.accept interface forces us to convert to unchecked
-      throw new RuntimeException(ex);
-    }
-    return this;
-  }
-
-  private X509ExtendedKeyManager loadKeyManager(Path path)
-      throws IOException, GeneralSecurityException {
-
-    X509ExtendedKeyManager keyManager = null;
-    KeyStore keystore = KeyStore.getInstance(type);
-
-    try (InputStream is = Files.newInputStream(path)) {
-      keystore.load(is, this.storePassword.toCharArray());
-    }
-
-    LOG.debug(" Loaded KeyStore: " + path.toFile().getAbsolutePath());
-
-    KeyManagerFactory keyMgrFactory = KeyManagerFactory.getInstance(
-        SSLFactory.SSLCERTIFICATE);
-    keyMgrFactory.init(keystore,
-        (keyPassword != null) ? keyPassword.toCharArray() : null);
-    for (KeyManager candidate: keyMgrFactory.getKeyManagers()) {
-      if (candidate instanceof X509ExtendedKeyManager) {
-        keyManager = (X509ExtendedKeyManager)candidate;
-        break;
-      }
-    }
-    return keyManager;
-  }
-}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
index 68fd4c1..7430477 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
@@ -32,8 +32,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.cert.CertificateException;
@@ -41,23 +39,31 @@ import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A {@link TrustManager} implementation that exposes a method, {@link #loadFrom(Path)}
- * to reload its configuration for example when the truststore file on disk changes.
+ * A {@link TrustManager} implementation that reloads its configuration when
+ * the truststore file on disk changes.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public final class ReloadingX509TrustManager implements X509TrustManager {
+public final class ReloadingX509TrustManager
+  implements X509TrustManager, Runnable {
 
+  @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(ReloadingX509TrustManager.class);
-
+  @VisibleForTesting
   static final String RELOAD_ERROR_MESSAGE =
       "Could not load truststore (keep using existing one) : ";
 
   private String type;
+  private File file;
   private String password;
+  private long lastLoaded;
+  private long reloadInterval;
   private AtomicReference<X509TrustManager> trustManagerRef;
 
+  private volatile boolean running;
+  private Thread reloader;
+
   /**
    * Creates a reloadable trustmanager. The trustmanager reloads itself
    * if the underlying trustore file has changed.
@@ -65,18 +71,49 @@ public final class ReloadingX509TrustManager implements X509TrustManager {
    * @param type type of truststore file, typically 'jks'.
    * @param location local path to the truststore file.
    * @param password password of the truststore file.
+   * @param reloadInterval interval to check if the truststore file has
    * changed, in milliseconds.
    * @throws IOException thrown if the truststore could not be initialized due
    * to an IO error.
    * @throws GeneralSecurityException thrown if the truststore could not be
    * initialized due to a security error.
    */
-  public ReloadingX509TrustManager(String type, String location, String password)
+  public ReloadingX509TrustManager(String type, String location,
+                                   String password, long reloadInterval)
     throws IOException, GeneralSecurityException {
     this.type = type;
+    file = new File(location);
     this.password = password;
     trustManagerRef = new AtomicReference<X509TrustManager>();
-    trustManagerRef.set(loadTrustManager(Paths.get(location)));
+    trustManagerRef.set(loadTrustManager());
+    this.reloadInterval = reloadInterval;
+  }
+
+  /**
+   * Starts the reloader thread.
+   */
+  public void init() {
+    reloader = new Thread(this, "Truststore reloader thread");
+    reloader.setDaemon(true);
+    running =  true;
+    reloader.start();
+  }
+
+  /**
+   * Stops the reloader thread.
+   */
+  public void destroy() {
+    running = false;
+    reloader.interrupt();
+  }
+
+  /**
+   * Returns the reload check interval.
+   *
+   * @return the reload check interval, in milliseconds.
+   */
+  public long getReloadInterval() {
+    return reloadInterval;
   }
 
   @Override
@@ -114,24 +151,27 @@ public final class ReloadingX509TrustManager implements X509TrustManager {
     return issuers;
   }
 
-  public ReloadingX509TrustManager loadFrom(Path path) {
-    try {
-      this.trustManagerRef.set(loadTrustManager(path));
-    } catch (Exception ex) {
-      // The Consumer.accept interface forces us to convert to unchecked
-      throw new RuntimeException(RELOAD_ERROR_MESSAGE, ex);
+  boolean needsReload() {
+    boolean reload = true;
+    if (file.exists()) {
+      if (file.lastModified() == lastLoaded) {
+        reload = false;
+      }
+    } else {
+      lastLoaded = 0;
     }
-    return this;
+    return reload;
   }
 
-  X509TrustManager loadTrustManager(Path path)
+  X509TrustManager loadTrustManager()
   throws IOException, GeneralSecurityException {
     X509TrustManager trustManager = null;
     KeyStore ks = KeyStore.getInstance(type);
-    InputStream in = Files.newInputStream(path);
+    InputStream in = Files.newInputStream(file.toPath());
     try {
       ks.load(in, (password == null) ? null : password.toCharArray());
-      LOG.debug("Loaded truststore '" + path + "'");
+      lastLoaded = file.lastModified();
+      LOG.debug("Loaded truststore '" + file + "'");
     } finally {
       in.close();
     }
@@ -148,4 +188,23 @@ public final class ReloadingX509TrustManager implements X509TrustManager {
     }
     return trustManager;
   }
+
+  @Override
+  public void run() {
+    while (running) {
+      try {
+        Thread.sleep(reloadInterval);
+      } catch (InterruptedException e) {
+        //NOP
+      }
+      if (running && needsReload()) {
+        try {
+          trustManagerRef.set(loadTrustManager());
+        } catch (Exception ex) {
+          LOG.warn(RELOAD_ERROR_MESSAGE + ex.toString(), ex);
+        }
+      }
+    }
+  }
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java
deleted file mode 100644
index bf0a6ab..0000000
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.security.ssl;
-
-import org.apache.hadoop.thirdparty.com.google.common.base.Supplier;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Paths;
-import java.security.KeyPair;
-import java.security.cert.X509Certificate;
-import java.util.Timer;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.hadoop.security.ssl.KeyStoreTestUtil.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-public class TestReloadingX509KeyManager {
-
-    private static final String BASEDIR = GenericTestUtils.getTempPath(
-            TestReloadingX509TrustManager.class.getSimpleName());
-
-    private final GenericTestUtils.LogCapturer reloaderLog = GenericTestUtils.LogCapturer.captureLogs(
-            FileMonitoringTimerTask.LOG);
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        File base = new File(BASEDIR);
-        FileUtil.fullyDelete(base);
-        base.mkdirs();
-    }
-
-    @Test(expected = IOException.class)
-    public void testLoadMissingKeyStore() throws Exception {
-        String keystoreLocation = BASEDIR + "/testmissing.jks";
-
-        ReloadingX509KeystoreManager tm =
-                new ReloadingX509KeystoreManager("jks", keystoreLocation,
-                        "password",
-                        "password");
-    }
-
-    @Test(expected = IOException.class)
-    public void testLoadCorruptKeyStore() throws Exception {
-        String keystoreLocation = BASEDIR + "/testcorrupt.jks";
-        OutputStream os = new FileOutputStream(keystoreLocation);
-        os.write(1);
-        os.close();
-
-        ReloadingX509KeystoreManager tm =
-                new ReloadingX509KeystoreManager("jks", keystoreLocation,
-                        "password",
-                        "password");
-    }
-
-    @Test (timeout = 3000000)
-    public void testReload() throws Exception {
-        KeyPair kp = generateKeyPair("RSA");
-        X509Certificate sCert = generateCertificate("CN=localhost, O=server", kp, 30,
-                        "SHA1withRSA");
-        String keystoreLocation = BASEDIR + "/testreload.jks";
-        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), sCert);
-
-        long reloadInterval = 10;
-        Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
-        ReloadingX509KeystoreManager tm =
-                new ReloadingX509KeystoreManager("jks", keystoreLocation,
-                        "password",
-                        "password");
-        try {
-            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
-                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
-            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
-
-            // Wait so that the file modification time is different
-            Thread.sleep((reloadInterval+ 1000));
-
-            // Change the certificate with a new keypair
-            final KeyPair anotherKP = generateKeyPair("RSA");
-            sCert = KeyStoreTestUtil.generateCertificate("CN=localhost, O=server", anotherKP, 30,
-                            "SHA1withRSA");
-            createKeyStore(keystoreLocation, "password", "cert1", anotherKP.getPrivate(), sCert);
-
-            GenericTestUtils.waitFor(new Supplier<Boolean>() {
-                @Override
-                public Boolean get() {
-                    return tm.getPrivateKey("cert1").equals(kp.getPrivate());
-                }
-            }, (int) reloadInterval, 100000);
-        } finally {
-            fileMonitoringTimer.cancel();
-        }
-    }
-
-    @Test (timeout = 30000)
-    public void testReloadMissingTrustStore() throws Exception {
-        KeyPair kp = generateKeyPair("RSA");
-        X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
-        String keystoreLocation = BASEDIR + "/testmissing.jks";
-        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
-
-        long reloadInterval = 10;
-        Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
-        ReloadingX509KeystoreManager tm =
-                new ReloadingX509KeystoreManager("jks", keystoreLocation,
-                        "password",
-                        "password");
-        try {
-            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
-                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
-            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
-
-            assertFalse(reloaderLog.getOutput().contains(
-                    FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
-
-            // Wait for the first reload to happen so we actually detect a change after the delete
-            Thread.sleep((reloadInterval+ 1000));
-
-            new File(keystoreLocation).delete();
-
-            // Wait for the reload to happen and log to get written to
-            Thread.sleep((reloadInterval+ 1000));
-
-            waitForFailedReloadAtLeastOnce((int) reloadInterval);
-
-            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
-        } finally {
-            reloaderLog.stopCapturing();
-            fileMonitoringTimer.cancel();
-        }
-    }
-
-
-    @Test (timeout = 30000)
-    public void testReloadCorruptTrustStore() throws Exception {
-        KeyPair kp = generateKeyPair("RSA");
-        X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
-        String keystoreLocation = BASEDIR + "/testmissing.jks";
-        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
-
-        long reloadInterval = 10;
-        Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
-        ReloadingX509KeystoreManager tm =
-                new ReloadingX509KeystoreManager("jks", keystoreLocation,
-                        "password",
-                        "password");
-        try {
-            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
-                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
-            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
-
-            // Wait so that the file modification time is different
-            Thread.sleep((reloadInterval + 1000));
-
-            assertFalse(reloaderLog.getOutput().contains(
-                    FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
-            OutputStream os = new FileOutputStream(keystoreLocation);
-            os.write(1);
-            os.close();
-
-            waitForFailedReloadAtLeastOnce((int) reloadInterval);
-
-            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
-        } finally {
-            reloaderLog.stopCapturing();
-            fileMonitoringTimer.cancel();
-        }
-    }
-
-    /**Wait for the reloader thread to load the configurations at least once
-     * by probing the log of the thread if the reload fails.
-     */
-    private void waitForFailedReloadAtLeastOnce(int reloadInterval)
-            throws InterruptedException, TimeoutException {
-        GenericTestUtils.waitFor(new Supplier<Boolean>() {
-            @Override
-            public Boolean get() {
-                return reloaderLog.getOutput().contains(
-                        FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
-            }
-        }, reloadInterval, 10 * 1000);
-    }
-}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
index 6358959..441f552 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
@@ -30,12 +30,10 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.file.Paths;
 import java.security.KeyPair;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Timer;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
@@ -52,7 +50,7 @@ public class TestReloadingX509TrustManager {
   private X509Certificate cert1;
   private X509Certificate cert2;
   private final LogCapturer reloaderLog = LogCapturer.captureLogs(
-      FileMonitoringTimerTask.LOG);
+      ReloadingX509TrustManager.LOG);
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -66,7 +64,12 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testmissing.jks";
 
     ReloadingX509TrustManager tm =
-            new ReloadingX509TrustManager("jks", truststoreLocation, "password");
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+    try {
+      tm.init();
+    } finally {
+      tm.destroy();
+    }
   }
 
   @Test(expected = IOException.class)
@@ -77,7 +80,12 @@ public class TestReloadingX509TrustManager {
     os.close();
 
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+    try {
+      tm.init();
+    } finally {
+      tm.destroy();
+    }
   }
 
   @Test (timeout = 30000)
@@ -88,17 +96,14 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testreload.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
-    long reloadInterval = 10;
-    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     final ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
     try {
-      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
-              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+      tm.init();
       assertEquals(1, tm.getAcceptedIssuers().length);
 
       // Wait so that the file modification time is different
-      Thread.sleep((reloadInterval+ 1000));
+      Thread.sleep((tm.getReloadInterval() + 1000));
 
       // Add another cert
       Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
@@ -111,9 +116,9 @@ public class TestReloadingX509TrustManager {
         public Boolean get() {
           return tm.getAcceptedIssuers().length == 2;
         }
-      }, (int) reloadInterval, 100000);
+      }, (int) tm.getReloadInterval(), 10000);
     } finally {
-      fileMonitoringTimer.cancel();
+      tm.destroy();
     }
   }
 
@@ -125,38 +130,27 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testmissing.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
-    long reloadInterval = 10;
-    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
     try {
-      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
-              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+      tm.init();
       assertEquals(1, tm.getAcceptedIssuers().length);
       X509Certificate cert = tm.getAcceptedIssuers()[0];
 
       assertFalse(reloaderLog.getOutput().contains(
-              FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
-
-      // Wait for the first reload to happen so we actually detect a change after the delete
-      Thread.sleep((reloadInterval+ 1000));
-
+          ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
       new File(truststoreLocation).delete();
 
-      // Wait for the reload to happen and log to get written to
-      Thread.sleep((reloadInterval+ 1000));
-
-      waitForFailedReloadAtLeastOnce((int) reloadInterval);
+      waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
 
       assertEquals(1, tm.getAcceptedIssuers().length);
       assertEquals(cert, tm.getAcceptedIssuers()[0]);
     } finally {
       reloaderLog.stopCapturing();
-      fileMonitoringTimer.cancel();
+      tm.destroy();
     }
   }
 
-
   @Test (timeout = 30000)
   public void testReloadCorruptTrustStore() throws Exception {
     KeyPair kp = generateKeyPair("RSA");
@@ -165,32 +159,29 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testcorrupt.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
-    long reloadInterval = 10;
-    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
     try {
-      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
-              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+      tm.init();
       assertEquals(1, tm.getAcceptedIssuers().length);
       final X509Certificate cert = tm.getAcceptedIssuers()[0];
 
       // Wait so that the file modification time is different
-      Thread.sleep((reloadInterval + 1000));
+      Thread.sleep((tm.getReloadInterval() + 1000));
 
       assertFalse(reloaderLog.getOutput().contains(
-              FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
+          ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
       OutputStream os = new FileOutputStream(truststoreLocation);
       os.write(1);
       os.close();
 
-      waitForFailedReloadAtLeastOnce((int) reloadInterval);
+      waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
 
       assertEquals(1, tm.getAcceptedIssuers().length);
       assertEquals(cert, tm.getAcceptedIssuers()[0]);
     } finally {
       reloaderLog.stopCapturing();
-      fileMonitoringTimer.cancel();
+      tm.destroy();
     }
   }
 
@@ -203,7 +194,7 @@ public class TestReloadingX509TrustManager {
       @Override
       public Boolean get() {
         return reloaderLog.getOutput().contains(
-            FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
+            ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE);
       }
     }, reloadInterval, 10 * 1000);
   }
@@ -217,15 +208,13 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testreload.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
-    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     final ReloadingX509TrustManager tm =
-        new ReloadingX509TrustManager("jks", truststoreLocation, null);
+        new ReloadingX509TrustManager("jks", truststoreLocation, null, 10);
     try {
-      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
-              Paths.get(truststoreLocation), tm::loadFrom,null), 10, 10);
+      tm.init();
       assertEquals(1, tm.getAcceptedIssuers().length);
     } finally {
-      fileMonitoringTimer.cancel();
+      tm.destroy();
     }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java
index eebe0ba..108ce50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import static org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -100,7 +99,7 @@ public final class TestURLConnectionFactory {
     Thread reloaderThread = null;
     for (Thread thread : threads) {
       if ((thread.getName() != null)
-          && (thread.getName().contains(SSL_MONITORING_THREAD_NAME))) {
+          && (thread.getName().contains("Truststore reloader thread"))) {
         reloaderThread = thread;
       }
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index 4d9c320..9f9564a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
-import static org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME;
 import org.apache.hadoop.test.TestGenericTestUtils;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -477,7 +476,7 @@ public class TestTimelineClient {
     Thread reloaderThread = null;
     for (Thread thread : threads) {
       if ((thread.getName() != null)
-          && (thread.getName().contains(SSL_MONITORING_THREAD_NAME))) {
+          && (thread.getName().contains("Truststore reloader thread"))) {
         reloaderThread = thread;
       }
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/pom.xml b/hadoop-yarn-project/hadoop-yarn/pom.xml
index caa9a1b..dff9a2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/pom.xml
@@ -11,8 +11,7 @@
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.hadoop</groupId>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-16524. Automatic keystore reloading for HttpServer2

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 2c482fbacfcdfeeb6643ec753f9e8144269fac46
Author: Borislav Iordanov <bo...@gmail.com>
AuthorDate: Wed Mar 31 10:45:15 2021 -0700

    HADOOP-16524. Automatic keystore reloading for HttpServer2
    
    Reapply of issue reverted first because it caused yarn failures and
    then again because the commit message was incorrectly formatted
    (and yet again because of commit message format).
    
    Signed-off-by: stack <st...@apache.org>
---
 .../java/org/apache/hadoop/http/HttpServer2.java   |  62 +++++-
 .../security/ssl/FileBasedKeyStoresFactory.java    | 222 +++++++++++++--------
 .../security/ssl/FileMonitoringTimerTask.java      |  85 ++++++++
 .../security/ssl/ReloadingX509KeystoreManager.java | 157 +++++++++++++++
 .../security/ssl/ReloadingX509TrustManager.java    |  95 ++-------
 .../security/ssl/TestReloadingX509KeyManager.java  | 205 +++++++++++++++++++
 .../ssl/TestReloadingX509TrustManager.java         |  77 ++++---
 .../hadoop/hdfs/web/TestURLConnectionFactory.java  |   3 +-
 .../yarn/client/api/impl/TestTimelineClient.java   |   3 +-
 hadoop-yarn-project/hadoop-yarn/pom.xml            |   3 +-
 10 files changed, 715 insertions(+), 197 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
index 7534cba..dde27db 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
@@ -27,14 +27,17 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
+import java.nio.file.Paths;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Enumeration;
+import java.util.Arrays;
+import java.util.Timer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -75,6 +78,8 @@ import org.apache.hadoop.security.authentication.server.ProxyUserAuthenticationF
 import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
 import org.apache.hadoop.security.authentication.util.SignerSecretProvider;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.FileMonitoringTimerTask;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
@@ -186,6 +191,7 @@ public final class HttpServer2 implements FilterContainer {
   static final String STATE_DESCRIPTION_ALIVE = " - alive";
   static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
   private final SignerSecretProvider secretProvider;
+  private final Optional<java.util.Timer> configurationChangeMonitor;
   private XFrameOption xFrameOption;
   private boolean xFrameOptionIsEnabled;
   public static final String HTTP_HEADER_PREFIX = "hadoop.http.header.";
@@ -244,6 +250,8 @@ public final class HttpServer2 implements FilterContainer {
 
     private boolean sniHostCheckEnabled;
 
+    private Optional<Timer> configurationChangeMonitor = Optional.empty();
+
     public Builder setName(String name){
       this.name = name;
       return this;
@@ -574,12 +582,45 @@ public final class HttpServer2 implements FilterContainer {
       }
 
       setEnabledProtocols(sslContextFactory);
+
+      long storesReloadInterval =
+          conf.getLong(FileBasedKeyStoresFactory.SSL_STORES_RELOAD_INTERVAL_TPL_KEY,
+              FileBasedKeyStoresFactory.DEFAULT_SSL_STORES_RELOAD_INTERVAL);
+
+      if (storesReloadInterval > 0) {
+        this.configurationChangeMonitor = Optional.of(
+            this.makeConfigurationChangeMonitor(storesReloadInterval, sslContextFactory));
+      }
+
       conn.addFirstConnectionFactory(new SslConnectionFactory(sslContextFactory,
           HttpVersion.HTTP_1_1.asString()));
 
       return conn;
     }
 
+    private Timer makeConfigurationChangeMonitor(long reloadInterval,
+        SslContextFactory.Server sslContextFactory) {
+      java.util.Timer timer = new java.util.Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
+      //
+      // The Jetty SSLContextFactory provides a 'reload' method which will reload both
+      // truststore and keystore certificates.
+      //
+      timer.schedule(new FileMonitoringTimerTask(
+              Paths.get(keyStore),
+              path -> {
+                LOG.info("Reloading certificates from store keystore " + keyStore);
+                try {
+                  sslContextFactory.reload(factory -> { });
+                } catch (Exception ex) {
+                  LOG.error("Failed to reload SSL keystore certificates", ex);
+                }
+              },null),
+          reloadInterval,
+          reloadInterval
+      );
+      return timer;
+    }
+
     private void setEnabledProtocols(SslContextFactory sslContextFactory) {
       String enabledProtocols = conf.get(SSLFactory.SSL_ENABLED_PROTOCOLS_KEY,
           SSLFactory.SSL_ENABLED_PROTOCOLS_DEFAULT);
@@ -622,6 +663,7 @@ public final class HttpServer2 implements FilterContainer {
     this.webAppContext = createWebAppContext(b, adminsAcl, appDir);
     this.xFrameOptionIsEnabled = b.xFrameEnabled;
     this.xFrameOption = b.xFrameOption;
+    this.configurationChangeMonitor = b.configurationChangeMonitor;
 
     try {
       this.secretProvider =
@@ -1420,6 +1462,16 @@ public final class HttpServer2 implements FilterContainer {
    */
   public void stop() throws Exception {
     MultiException exception = null;
+    if (this.configurationChangeMonitor.isPresent()) {
+      try {
+        this.configurationChangeMonitor.get().cancel();
+      } catch (Exception e) {
+        LOG.error(
+            "Error while canceling configuration monitoring timer for webapp"
+                + webAppContext.getDisplayName(), e);
+        exception = addMultiException(exception, e);
+      }
+    }
     for (ServerConnector c : listeners) {
       try {
         c.close();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
index b184e4a..236d881 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
@@ -29,20 +29,20 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManager;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.text.MessageFormat;
+import java.util.Timer;
 
 /**
  * {@link KeyStoresFactory} implementation that reads the certificates from
  * keystore files.
  * <p>
- * if the trust certificates keystore file changes, the {@link TrustManager}
- * is refreshed with the new trust certificate entries (using a
- * {@link ReloadingX509TrustManager} trustmanager).
+ * If either the truststore or the keystore certificates file changes, it
+ * would be refreshed under the corresponding wrapper implementation -
+ * {@link ReloadingX509KeystoreManager} or {@link ReloadingX509TrustManager}.
+ * </p>
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -51,6 +51,19 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   private static final Logger LOG =
       LoggerFactory.getLogger(FileBasedKeyStoresFactory.class);
 
+
+  /**
+   * The name of the timer thread monitoring file changes.
+   */
+  public static final String SSL_MONITORING_THREAD_NAME = "SSL Certificates Store Monitor";
+
+  /**
+   * The refresh interval used to check if either of the truststore or keystore
+   * certificate file has changed.
+   */
+  public static final String SSL_STORES_RELOAD_INTERVAL_TPL_KEY =
+    "ssl.{0}.stores.reload.interval";
+
   public static final String SSL_KEYSTORE_LOCATION_TPL_KEY =
     "ssl.{0}.keystore.location";
   public static final String SSL_KEYSTORE_PASSWORD_TPL_KEY =
@@ -77,14 +90,119 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   public static final String DEFAULT_KEYSTORE_TYPE = "jks";
 
   /**
-   * Reload interval in milliseconds.
+   * The default time interval in milliseconds used to check if either
+   * of the truststore or keystore certificates file has changed and needs reloading.
    */
-  public static final int DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL = 10000;
+  public static final int DEFAULT_SSL_STORES_RELOAD_INTERVAL = 10000;
 
   private Configuration conf;
   private KeyManager[] keyManagers;
   private TrustManager[] trustManagers;
   private ReloadingX509TrustManager trustManager;
+  private Timer fileMonitoringTimer;
+
+
+  private void createTrustManagersFromConfiguration(SSLFactory.Mode mode,
+                                                    String truststoreType,
+                                                    String truststoreLocation,
+                                                    long storesReloadInterval)
+      throws IOException, GeneralSecurityException {
+    String passwordProperty = resolvePropertyName(mode,
+        SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
+    String truststorePassword = getPassword(conf, passwordProperty, "");
+    if (truststorePassword.isEmpty()) {
+      // An empty trust store password is legal; the trust store password
+      // is only required when writing to a trust store. Otherwise it's
+      // an optional integrity check.
+      truststorePassword = null;
+    }
+
+    // Check if obsolete truststore specific reload interval is present for backward compatible
+    long truststoreReloadInterval =
+        conf.getLong(
+            resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
+            storesReloadInterval);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation +
+          ", reloading at " + truststoreReloadInterval + " millis.");
+    }
+
+    trustManager = new ReloadingX509TrustManager(
+        truststoreType,
+        truststoreLocation,
+        truststorePassword);
+
+    if (truststoreReloadInterval > 0) {
+      fileMonitoringTimer.schedule(
+          new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation),
+              path -> trustManager.loadFrom(path),
+              exception -> LOG.error(ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE, exception)),
+          truststoreReloadInterval,
+          truststoreReloadInterval);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
+    }
+    trustManagers = new TrustManager[]{trustManager};
+  }
+
+  /**
+   * Implements logic of initializing the KeyManagers with the options
+   * to reload keystores.
+   * @param mode client or server
+   * @param keystoreType The keystore type.
+   * @param storesReloadInterval The interval to check if the keystore certificates
+   *                             file has changed.
+   */
+  private void createKeyManagersFromConfiguration(SSLFactory.Mode mode,
+                                                  String keystoreType, long storesReloadInterval)
+      throws GeneralSecurityException, IOException {
+    String locationProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
+    String keystoreLocation = conf.get(locationProperty, "");
+    if (keystoreLocation.isEmpty()) {
+      throw new GeneralSecurityException("The property '" + locationProperty +
+          "' has not been set in the ssl configuration file.");
+    }
+    String passwordProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
+    String keystorePassword = getPassword(conf, passwordProperty, "");
+    if (keystorePassword.isEmpty()) {
+      throw new GeneralSecurityException("The property '" + passwordProperty +
+          "' has not been set in the ssl configuration file.");
+    }
+    String keyPasswordProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
+    // Key password defaults to the same value as store password for
+    // compatibility with legacy configurations that did not use a separate
+    // configuration property for key password.
+    String keystoreKeyPassword = getPassword(
+        conf, keyPasswordProperty, keystorePassword);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
+    }
+
+    ReloadingX509KeystoreManager keystoreManager =  new ReloadingX509KeystoreManager(
+        keystoreType,
+        keystoreLocation,
+        keystorePassword,
+        keystoreKeyPassword);
+
+    if (storesReloadInterval > 0) {
+      fileMonitoringTimer.schedule(
+          new FileMonitoringTimerTask(
+              Paths.get(keystoreLocation),
+              path -> keystoreManager.loadFrom(path),
+              exception -> LOG.error(ReloadingX509KeystoreManager.RELOAD_ERROR_MESSAGE, exception)),
+          storesReloadInterval,
+          storesReloadInterval);
+    }
+
+    keyManagers = new KeyManager[] { keystoreManager };
+  }
 
   /**
    * Resolves a property name to its client/server version if applicable.
@@ -139,56 +257,28 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
       conf.getBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY,
           SSLFactory.SSL_REQUIRE_CLIENT_CERT_DEFAULT);
 
+    long storesReloadInterval = conf.getLong(
+        resolvePropertyName(mode, SSL_STORES_RELOAD_INTERVAL_TPL_KEY),
+        DEFAULT_SSL_STORES_RELOAD_INTERVAL);
+
+    fileMonitoringTimer = new Timer(SSL_MONITORING_THREAD_NAME, true);
+
     // certificate store
     String keystoreType =
-      conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
-               DEFAULT_KEYSTORE_TYPE);
-    KeyStore keystore = KeyStore.getInstance(keystoreType);
-    String keystoreKeyPassword = null;
-    if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
-      String locationProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
-      String keystoreLocation = conf.get(locationProperty, "");
-      if (keystoreLocation.isEmpty()) {
-        throw new GeneralSecurityException("The property '" + locationProperty +
-          "' has not been set in the ssl configuration file.");
-      }
-      String passwordProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
-      String keystorePassword = getPassword(conf, passwordProperty, "");
-      if (keystorePassword.isEmpty()) {
-        throw new GeneralSecurityException("The property '" + passwordProperty +
-          "' has not been set in the ssl configuration file.");
-      }
-      String keyPasswordProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
-      // Key password defaults to the same value as store password for
-      // compatibility with legacy configurations that did not use a separate
-      // configuration property for key password.
-      keystoreKeyPassword = getPassword(
-          conf, keyPasswordProperty, keystorePassword);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
-      }
+        conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
+                 DEFAULT_KEYSTORE_TYPE);
 
-      InputStream is = Files.newInputStream(Paths.get(keystoreLocation));
-      try {
-        keystore.load(is, keystorePassword.toCharArray());
-      } finally {
-        is.close();
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " Loaded KeyStore: " + keystoreLocation);
-      }
+    if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
+      createKeyManagersFromConfiguration(mode, keystoreType, storesReloadInterval);
     } else {
+      KeyStore keystore = KeyStore.getInstance(keystoreType);
       keystore.load(null, null);
+      KeyManagerFactory keyMgrFactory = KeyManagerFactory
+              .getInstance(SSLFactory.SSLCERTIFICATE);
+
+      keyMgrFactory.init(keystore, null);
+      keyManagers = keyMgrFactory.getKeyManagers();
     }
-    KeyManagerFactory keyMgrFactory = KeyManagerFactory
-        .getInstance(SSLFactory.SSLCERTIFICATE);
-      
-    keyMgrFactory.init(keystore, (keystoreKeyPassword != null) ?
-                                 keystoreKeyPassword.toCharArray() : null);
-    keyManagers = keyMgrFactory.getKeyManagers();
 
     //trust store
     String truststoreType =
@@ -199,33 +289,7 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
       resolvePropertyName(mode, SSL_TRUSTSTORE_LOCATION_TPL_KEY);
     String truststoreLocation = conf.get(locationProperty, "");
     if (!truststoreLocation.isEmpty()) {
-      String passwordProperty = resolvePropertyName(mode,
-          SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
-      String truststorePassword = getPassword(conf, passwordProperty, "");
-      if (truststorePassword.isEmpty()) {
-        // An empty trust store password is legal; the trust store password
-        // is only required when writing to a trust store. Otherwise it's
-        // an optional integrity check.
-        truststorePassword = null;
-      }
-      long truststoreReloadInterval =
-          conf.getLong(
-              resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
-              DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation);
-      }
-
-      trustManager = new ReloadingX509TrustManager(truststoreType,
-          truststoreLocation,
-          truststorePassword,
-          truststoreReloadInterval);
-      trustManager.init();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
-      }
-      trustManagers = new TrustManager[]{trustManager};
+      createTrustManagersFromConfiguration(mode, truststoreType, truststoreLocation, storesReloadInterval);
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("The property '" + locationProperty + "' has not been set, " +
@@ -256,7 +320,7 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   @Override
   public synchronized void destroy() {
     if (trustManager != null) {
-      trustManager.destroy();
+      fileMonitoringTimer.cancel();
       trustManager = null;
       keyManagers = null;
       trustManagers = null;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java
new file mode 100644
index 0000000..40b6197
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.ssl;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.TimerTask;
+import java.util.function.Consumer;
+
+/**
+ * Implements basic logic to track when a file changes on disk and call the action
+ * passed to the constructor when it does. An exception handler can optionally also be specified
+ * in the constructor, otherwise any exception occurring during process will be logged
+ * using this class' logger.
+ */
+@InterfaceAudience.Private
+public class FileMonitoringTimerTask extends TimerTask {
+
+  static final Logger LOG = LoggerFactory.getLogger(FileMonitoringTimerTask.class);
+
+  @VisibleForTesting
+  static final String PROCESS_ERROR_MESSAGE =
+      "Could not process file change : ";
+
+  final private Path filePath;
+  final private Consumer<Path> onFileChange;
+  final Consumer<Throwable> onChangeFailure;
+  private long lastProcessed;
+
+  /**
+   * Create file monitoring task to be scheduled using a standard Java {@link java.util.Timer}
+   * instance.
+   *
+   * @param filePath The path to the file to monitor.
+   * @param onFileChange The function to call when the file has changed.
+   * @param onChangeFailure The function to call when an exception is thrown during the
+   *                       file change processing.
+   */
+  public FileMonitoringTimerTask(Path filePath, Consumer<Path> onFileChange,
+      Consumer<Throwable> onChangeFailure) {
+    Preconditions.checkNotNull(filePath, "path to monitor disk file is not set");
+    Preconditions.checkNotNull(onFileChange, "action to monitor disk file is not set");
+
+    this.filePath = filePath;
+    this.lastProcessed = filePath.toFile().lastModified();
+    this.onFileChange = onFileChange;
+    this.onChangeFailure = onChangeFailure;
+  }
+
+  @Override
+  public void run() {
+    if (lastProcessed != filePath.toFile().lastModified()) {
+      try {
+        onFileChange.accept(filePath);
+      } catch (Throwable t) {
+        if (onChangeFailure  != null) {
+          onChangeFailure.accept(t);
+        } else {
+          LOG.error(PROCESS_ERROR_MESSAGE + filePath.toString(), t);
+        }
+      }
+      lastProcessed = filePath.toFile().lastModified();
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java
new file mode 100644
index 0000000..72e8b6b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.ssl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An implementation of <code>X509KeyManager</code> that exposes a method,
+ * {@link #loadFrom(Path)} to reload its configuration. Note that it is necessary
+ * to implement the <code>X509ExtendedKeyManager</code> to properly delegate
+ * the additional methods, otherwise the SSL handshake will fail.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReloadingX509KeystoreManager extends X509ExtendedKeyManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
+
+  static final String RELOAD_ERROR_MESSAGE =
+      "Could not load keystore (keep using existing one) : ";
+
+  final private String type;
+  final private String storePassword;
+  final private String keyPassword;
+  private AtomicReference<X509ExtendedKeyManager> keyManagerRef;
+
+  /**
+   * Construct a <code>Reloading509KeystoreManager</code>
+   *
+   * @param type type of keystore file, typically 'jks'.
+   * @param location local path to the keystore file.
+   * @param storePassword password of the keystore file.
+   * @param keyPassword The password of the key.
+   * @throws IOException
+   * @throws GeneralSecurityException
+   */
+  public ReloadingX509KeystoreManager(String type, String location,
+                                      String storePassword, String keyPassword)
+      throws IOException, GeneralSecurityException {
+    this.type = type;
+    this.storePassword = storePassword;
+    this.keyPassword = keyPassword;
+    keyManagerRef = new AtomicReference<X509ExtendedKeyManager>();
+    keyManagerRef.set(loadKeyManager(Paths.get(location)));
+  }
+
+  @Override
+  public String chooseEngineClientAlias(String[] strings, Principal[] principals,
+      SSLEngine sslEngine) {
+    return keyManagerRef.get().chooseEngineClientAlias(strings, principals, sslEngine);
+  }
+
+  @Override
+  public String chooseEngineServerAlias(String s, Principal[] principals,
+      SSLEngine sslEngine) {
+    return keyManagerRef.get().chooseEngineServerAlias(s, principals, sslEngine);
+  }
+
+  @Override
+  public String[] getClientAliases(String s, Principal[] principals) {
+    return keyManagerRef.get().getClientAliases(s, principals);
+  }
+
+  @Override
+  public String chooseClientAlias(String[] strings, Principal[] principals,
+      Socket socket) {
+    return keyManagerRef.get().chooseClientAlias(strings, principals, socket);
+  }
+
+  @Override
+  public String[] getServerAliases(String s, Principal[] principals) {
+    return keyManagerRef.get().getServerAliases(s, principals);
+  }
+
+  @Override
+  public String chooseServerAlias(String s, Principal[] principals,
+      Socket socket) {
+    return keyManagerRef.get().chooseServerAlias(s, principals, socket);
+  }
+
+  @Override
+  public X509Certificate[] getCertificateChain(String s) {
+    return keyManagerRef.get().getCertificateChain(s);
+  }
+
+  @Override
+  public PrivateKey getPrivateKey(String s) {
+    return keyManagerRef.get().getPrivateKey(s);
+  }
+
+  public ReloadingX509KeystoreManager loadFrom(Path path) {
+    try {
+      this.keyManagerRef.set(loadKeyManager(path));
+    } catch (Exception ex) {
+      // The Consumer.accept interface forces us to convert to unchecked
+      throw new RuntimeException(ex);
+    }
+    return this;
+  }
+
+  private X509ExtendedKeyManager loadKeyManager(Path path)
+      throws IOException, GeneralSecurityException {
+
+    X509ExtendedKeyManager keyManager = null;
+    KeyStore keystore = KeyStore.getInstance(type);
+
+    try (InputStream is = Files.newInputStream(path)) {
+      keystore.load(is, this.storePassword.toCharArray());
+    }
+
+    LOG.debug(" Loaded KeyStore: " + path.toFile().getAbsolutePath());
+
+    KeyManagerFactory keyMgrFactory = KeyManagerFactory.getInstance(
+        SSLFactory.SSLCERTIFICATE);
+    keyMgrFactory.init(keystore,
+        (keyPassword != null) ? keyPassword.toCharArray() : null);
+    for (KeyManager candidate: keyMgrFactory.getKeyManagers()) {
+      if (candidate instanceof X509ExtendedKeyManager) {
+        keyManager = (X509ExtendedKeyManager)candidate;
+        break;
+      }
+    }
+    return keyManager;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
index 7430477..68fd4c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
@@ -32,6 +32,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.cert.CertificateException;
@@ -39,31 +41,23 @@ import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A {@link TrustManager} implementation that reloads its configuration when
- * the truststore file on disk changes.
+ * A {@link TrustManager} implementation that exposes a method, {@link #loadFrom(Path)}
+ * to reload its configuration for example when the truststore file on disk changes.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public final class ReloadingX509TrustManager
-  implements X509TrustManager, Runnable {
+public final class ReloadingX509TrustManager implements X509TrustManager {
 
-  @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(ReloadingX509TrustManager.class);
-  @VisibleForTesting
+
   static final String RELOAD_ERROR_MESSAGE =
       "Could not load truststore (keep using existing one) : ";
 
   private String type;
-  private File file;
   private String password;
-  private long lastLoaded;
-  private long reloadInterval;
   private AtomicReference<X509TrustManager> trustManagerRef;
 
-  private volatile boolean running;
-  private Thread reloader;
-
   /**
    * Creates a reloadable trustmanager. The trustmanager reloads itself
    * if the underlying trustore file has changed.
@@ -71,49 +65,18 @@ public final class ReloadingX509TrustManager
    * @param type type of truststore file, typically 'jks'.
    * @param location local path to the truststore file.
    * @param password password of the truststore file.
-   * @param reloadInterval interval to check if the truststore file has
    * changed, in milliseconds.
    * @throws IOException thrown if the truststore could not be initialized due
    * to an IO error.
    * @throws GeneralSecurityException thrown if the truststore could not be
    * initialized due to a security error.
    */
-  public ReloadingX509TrustManager(String type, String location,
-                                   String password, long reloadInterval)
+  public ReloadingX509TrustManager(String type, String location, String password)
     throws IOException, GeneralSecurityException {
     this.type = type;
-    file = new File(location);
     this.password = password;
     trustManagerRef = new AtomicReference<X509TrustManager>();
-    trustManagerRef.set(loadTrustManager());
-    this.reloadInterval = reloadInterval;
-  }
-
-  /**
-   * Starts the reloader thread.
-   */
-  public void init() {
-    reloader = new Thread(this, "Truststore reloader thread");
-    reloader.setDaemon(true);
-    running =  true;
-    reloader.start();
-  }
-
-  /**
-   * Stops the reloader thread.
-   */
-  public void destroy() {
-    running = false;
-    reloader.interrupt();
-  }
-
-  /**
-   * Returns the reload check interval.
-   *
-   * @return the reload check interval, in milliseconds.
-   */
-  public long getReloadInterval() {
-    return reloadInterval;
+    trustManagerRef.set(loadTrustManager(Paths.get(location)));
   }
 
   @Override
@@ -151,27 +114,24 @@ public final class ReloadingX509TrustManager
     return issuers;
   }
 
-  boolean needsReload() {
-    boolean reload = true;
-    if (file.exists()) {
-      if (file.lastModified() == lastLoaded) {
-        reload = false;
-      }
-    } else {
-      lastLoaded = 0;
+  public ReloadingX509TrustManager loadFrom(Path path) {
+    try {
+      this.trustManagerRef.set(loadTrustManager(path));
+    } catch (Exception ex) {
+      // The Consumer.accept interface forces us to convert to unchecked
+      throw new RuntimeException(RELOAD_ERROR_MESSAGE, ex);
     }
-    return reload;
+    return this;
   }
 
-  X509TrustManager loadTrustManager()
+  X509TrustManager loadTrustManager(Path path)
   throws IOException, GeneralSecurityException {
     X509TrustManager trustManager = null;
     KeyStore ks = KeyStore.getInstance(type);
-    InputStream in = Files.newInputStream(file.toPath());
+    InputStream in = Files.newInputStream(path);
     try {
       ks.load(in, (password == null) ? null : password.toCharArray());
-      lastLoaded = file.lastModified();
-      LOG.debug("Loaded truststore '" + file + "'");
+      LOG.debug("Loaded truststore '" + path + "'");
     } finally {
       in.close();
     }
@@ -188,23 +148,4 @@ public final class ReloadingX509TrustManager
     }
     return trustManager;
   }
-
-  @Override
-  public void run() {
-    while (running) {
-      try {
-        Thread.sleep(reloadInterval);
-      } catch (InterruptedException e) {
-        //NOP
-      }
-      if (running && needsReload()) {
-        try {
-          trustManagerRef.set(loadTrustManager());
-        } catch (Exception ex) {
-          LOG.warn(RELOAD_ERROR_MESSAGE + ex.toString(), ex);
-        }
-      }
-    }
-  }
-
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java
new file mode 100644
index 0000000..bf0a6ab
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.ssl;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Supplier;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
+import java.util.Timer;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.security.ssl.KeyStoreTestUtil.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestReloadingX509KeyManager {
+
+    private static final String BASEDIR = GenericTestUtils.getTempPath(
+            TestReloadingX509TrustManager.class.getSimpleName());
+
+    private final GenericTestUtils.LogCapturer reloaderLog = GenericTestUtils.LogCapturer.captureLogs(
+            FileMonitoringTimerTask.LOG);
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File base = new File(BASEDIR);
+        FileUtil.fullyDelete(base);
+        base.mkdirs();
+    }
+
+    @Test(expected = IOException.class)
+    public void testLoadMissingKeyStore() throws Exception {
+        String keystoreLocation = BASEDIR + "/testmissing.jks";
+
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+    }
+
+    @Test(expected = IOException.class)
+    public void testLoadCorruptKeyStore() throws Exception {
+        String keystoreLocation = BASEDIR + "/testcorrupt.jks";
+        OutputStream os = new FileOutputStream(keystoreLocation);
+        os.write(1);
+        os.close();
+
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+    }
+
+    @Test (timeout = 3000000)
+    public void testReload() throws Exception {
+        KeyPair kp = generateKeyPair("RSA");
+        X509Certificate sCert = generateCertificate("CN=localhost, O=server", kp, 30,
+                        "SHA1withRSA");
+        String keystoreLocation = BASEDIR + "/testreload.jks";
+        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), sCert);
+
+        long reloadInterval = 10;
+        Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+        try {
+            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+
+            // Wait so that the file modification time is different
+            Thread.sleep((reloadInterval+ 1000));
+
+            // Change the certificate with a new keypair
+            final KeyPair anotherKP = generateKeyPair("RSA");
+            sCert = KeyStoreTestUtil.generateCertificate("CN=localhost, O=server", anotherKP, 30,
+                            "SHA1withRSA");
+            createKeyStore(keystoreLocation, "password", "cert1", anotherKP.getPrivate(), sCert);
+
+            GenericTestUtils.waitFor(new Supplier<Boolean>() {
+                @Override
+                public Boolean get() {
+                    return tm.getPrivateKey("cert1").equals(kp.getPrivate());
+                }
+            }, (int) reloadInterval, 100000);
+        } finally {
+            fileMonitoringTimer.cancel();
+        }
+    }
+
+    @Test (timeout = 30000)
+    public void testReloadMissingTrustStore() throws Exception {
+        KeyPair kp = generateKeyPair("RSA");
+        X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+        String keystoreLocation = BASEDIR + "/testmissing.jks";
+        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
+
+        long reloadInterval = 10;
+        Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+        try {
+            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+
+            assertFalse(reloaderLog.getOutput().contains(
+                    FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
+
+            // Wait for the first reload to happen so we actually detect a change after the delete
+            Thread.sleep((reloadInterval+ 1000));
+
+            new File(keystoreLocation).delete();
+
+            // Wait for the reload to happen and log to get written to
+            Thread.sleep((reloadInterval+ 1000));
+
+            waitForFailedReloadAtLeastOnce((int) reloadInterval);
+
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+        } finally {
+            reloaderLog.stopCapturing();
+            fileMonitoringTimer.cancel();
+        }
+    }
+
+
+    @Test (timeout = 30000)
+    public void testReloadCorruptTrustStore() throws Exception {
+        KeyPair kp = generateKeyPair("RSA");
+        X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+        String keystoreLocation = BASEDIR + "/testmissing.jks";
+        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
+
+        long reloadInterval = 10;
+        Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+        try {
+            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+
+            // Wait so that the file modification time is different
+            Thread.sleep((reloadInterval + 1000));
+
+            assertFalse(reloaderLog.getOutput().contains(
+                    FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
+            OutputStream os = new FileOutputStream(keystoreLocation);
+            os.write(1);
+            os.close();
+
+            waitForFailedReloadAtLeastOnce((int) reloadInterval);
+
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+        } finally {
+            reloaderLog.stopCapturing();
+            fileMonitoringTimer.cancel();
+        }
+    }
+
+    /**Wait for the reloader thread to load the configurations at least once
+     * by probing the log of the thread if the reload fails.
+     */
+    private void waitForFailedReloadAtLeastOnce(int reloadInterval)
+            throws InterruptedException, TimeoutException {
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+                return reloaderLog.getOutput().contains(
+                        FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
+            }
+        }, reloadInterval, 10 * 1000);
+    }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
index 441f552..6358959 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
@@ -30,10 +30,12 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Paths;
 import java.security.KeyPair;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
@@ -50,7 +52,7 @@ public class TestReloadingX509TrustManager {
   private X509Certificate cert1;
   private X509Certificate cert2;
   private final LogCapturer reloaderLog = LogCapturer.captureLogs(
-      ReloadingX509TrustManager.LOG);
+      FileMonitoringTimerTask.LOG);
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -64,12 +66,7 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testmissing.jks";
 
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
-    try {
-      tm.init();
-    } finally {
-      tm.destroy();
-    }
+            new ReloadingX509TrustManager("jks", truststoreLocation, "password");
   }
 
   @Test(expected = IOException.class)
@@ -80,12 +77,7 @@ public class TestReloadingX509TrustManager {
     os.close();
 
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
-    try {
-      tm.init();
-    } finally {
-      tm.destroy();
-    }
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
   }
 
   @Test (timeout = 30000)
@@ -96,14 +88,17 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testreload.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    long reloadInterval = 10;
+    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     final ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
       assertEquals(1, tm.getAcceptedIssuers().length);
 
       // Wait so that the file modification time is different
-      Thread.sleep((tm.getReloadInterval() + 1000));
+      Thread.sleep((reloadInterval+ 1000));
 
       // Add another cert
       Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
@@ -116,9 +111,9 @@ public class TestReloadingX509TrustManager {
         public Boolean get() {
           return tm.getAcceptedIssuers().length == 2;
         }
-      }, (int) tm.getReloadInterval(), 10000);
+      }, (int) reloadInterval, 100000);
     } finally {
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 
@@ -130,27 +125,38 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testmissing.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    long reloadInterval = 10;
+    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
       assertEquals(1, tm.getAcceptedIssuers().length);
       X509Certificate cert = tm.getAcceptedIssuers()[0];
 
       assertFalse(reloaderLog.getOutput().contains(
-          ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
+              FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
+
+      // Wait for the first reload to happen so we actually detect a change after the delete
+      Thread.sleep((reloadInterval+ 1000));
+
       new File(truststoreLocation).delete();
 
-      waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
+      // Wait for the reload to happen and log to get written to
+      Thread.sleep((reloadInterval+ 1000));
+
+      waitForFailedReloadAtLeastOnce((int) reloadInterval);
 
       assertEquals(1, tm.getAcceptedIssuers().length);
       assertEquals(cert, tm.getAcceptedIssuers()[0]);
     } finally {
       reloaderLog.stopCapturing();
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 
+
   @Test (timeout = 30000)
   public void testReloadCorruptTrustStore() throws Exception {
     KeyPair kp = generateKeyPair("RSA");
@@ -159,29 +165,32 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testcorrupt.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    long reloadInterval = 10;
+    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
       assertEquals(1, tm.getAcceptedIssuers().length);
       final X509Certificate cert = tm.getAcceptedIssuers()[0];
 
       // Wait so that the file modification time is different
-      Thread.sleep((tm.getReloadInterval() + 1000));
+      Thread.sleep((reloadInterval + 1000));
 
       assertFalse(reloaderLog.getOutput().contains(
-          ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
+              FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
       OutputStream os = new FileOutputStream(truststoreLocation);
       os.write(1);
       os.close();
 
-      waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
+      waitForFailedReloadAtLeastOnce((int) reloadInterval);
 
       assertEquals(1, tm.getAcceptedIssuers().length);
       assertEquals(cert, tm.getAcceptedIssuers()[0]);
     } finally {
       reloaderLog.stopCapturing();
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 
@@ -194,7 +203,7 @@ public class TestReloadingX509TrustManager {
       @Override
       public Boolean get() {
         return reloaderLog.getOutput().contains(
-            ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE);
+            FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
       }
     }, reloadInterval, 10 * 1000);
   }
@@ -208,13 +217,15 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testreload.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
     final ReloadingX509TrustManager tm =
-        new ReloadingX509TrustManager("jks", truststoreLocation, null, 10);
+        new ReloadingX509TrustManager("jks", truststoreLocation, null);
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), 10, 10);
       assertEquals(1, tm.getAcceptedIssuers().length);
     } finally {
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java
index 108ce50..eebe0ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import static org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -99,7 +100,7 @@ public final class TestURLConnectionFactory {
     Thread reloaderThread = null;
     for (Thread thread : threads) {
       if ((thread.getName() != null)
-          && (thread.getName().contains("Truststore reloader thread"))) {
+          && (thread.getName().contains(SSL_MONITORING_THREAD_NAME))) {
         reloaderThread = thread;
       }
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index 9f9564a..4d9c320 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import static org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME;
 import org.apache.hadoop.test.TestGenericTestUtils;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -476,7 +477,7 @@ public class TestTimelineClient {
     Thread reloaderThread = null;
     for (Thread thread : threads) {
       if ((thread.getName() != null)
-          && (thread.getName().contains("Truststore reloader thread"))) {
+          && (thread.getName().contains(SSL_MONITORING_THREAD_NAME))) {
         reloaderThread = thread;
       }
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/pom.xml b/hadoop-yarn-project/hadoop-yarn/pom.xml
index dff9a2b..caa9a1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/pom.xml
@@ -11,7 +11,8 @@
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License. See accompanying LICENSE file.
---><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.hadoop</groupId>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org