You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/01/08 21:46:05 UTC

[hadoop] branch branch-3.3 updated: HADOOP-16524. Reloading SSL keystore for both DataNode and NameNode (#2470) (#2609)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new f046ed2  HADOOP-16524. Reloading SSL keystore for both DataNode and NameNode (#2470) (#2609)
f046ed2 is described below

commit f046ed27d691f18a8d05045d8e018a3579d4c570
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Fri Jan 8 13:45:44 2021 -0800

    HADOOP-16524. Reloading SSL keystore for both DataNode and NameNode (#2470) (#2609)
    
    Co-authored-by: Borislav Iordanov <bi...@apple.com>
    Signed-off-by: stack <st...@apache.org>
    
    Co-authored-by: Borislav Iordanov <bo...@gmail.com>
    Co-authored-by: Borislav Iordanov <bi...@apple.com>
---
 .../java/org/apache/hadoop/http/HttpServer2.java   |  62 +++++-
 .../security/ssl/FileBasedKeyStoresFactory.java    | 216 +++++++++++++--------
 .../security/ssl/FileMonitoringTimerTask.java      |  85 ++++++++
 .../security/ssl/ReloadingX509KeystoreManager.java | 157 +++++++++++++++
 .../security/ssl/ReloadingX509TrustManager.java    |  95 ++-------
 .../security/ssl/TestReloadingX509KeyManager.java  | 205 +++++++++++++++++++
 .../ssl/TestReloadingX509TrustManager.java         |  77 ++++----
 7 files changed, 703 insertions(+), 194 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
index cdc2a74..39f5bac 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
@@ -27,14 +27,17 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
+import java.nio.file.Paths;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Enumeration;
+import java.util.Arrays;
+import java.util.Timer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -74,6 +77,8 @@ import org.apache.hadoop.security.authentication.server.ProxyUserAuthenticationF
 import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
 import org.apache.hadoop.security.authentication.util.SignerSecretProvider;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.FileMonitoringTimerTask;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
@@ -184,6 +189,7 @@ public final class HttpServer2 implements FilterContainer {
   static final String STATE_DESCRIPTION_ALIVE = " - alive";
   static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
   private final SignerSecretProvider secretProvider;
+  private final Optional<java.util.Timer> configurationChangeMonitor;
   private XFrameOption xFrameOption;
   private boolean xFrameOptionIsEnabled;
   public static final String HTTP_HEADER_PREFIX = "hadoop.http.header.";
@@ -239,6 +245,8 @@ public final class HttpServer2 implements FilterContainer {
 
     private boolean sniHostCheckEnabled;
 
+    private Optional<Timer> configurationChangeMonitor = Optional.empty();
+
     public Builder setName(String name){
       this.name = name;
       return this;
@@ -569,12 +577,45 @@ public final class HttpServer2 implements FilterContainer {
       }
 
       setEnabledProtocols(sslContextFactory);
+
+      long storesReloadInterval =
+          conf.getLong(FileBasedKeyStoresFactory.SSL_STORES_RELOAD_INTERVAL_TPL_KEY,
+              FileBasedKeyStoresFactory.DEFAULT_SSL_STORES_RELOAD_INTERVAL);
+
+      if (storesReloadInterval > 0) {
+        this.configurationChangeMonitor = Optional.of(
+            this.makeConfigurationChangeMonitor(storesReloadInterval, sslContextFactory));
+      }
+
       conn.addFirstConnectionFactory(new SslConnectionFactory(sslContextFactory,
           HttpVersion.HTTP_1_1.asString()));
 
       return conn;
     }
 
+    private Timer makeConfigurationChangeMonitor(long reloadInterval,
+        SslContextFactory.Server sslContextFactory) {
+      Timer timer = new Timer("SSL Certificates Store Monitor", true);
+      //
+      // The Jetty SSLContextFactory provides a 'reload' method which will reload both
+      // truststore and keystore certificates.
+      //
+      timer.schedule(new FileMonitoringTimerTask(
+              Paths.get(keyStore),
+              path -> {
+                LOG.info("Reloading certificates from store keystore " + keyStore);
+                try {
+                  sslContextFactory.reload(factory -> { });
+                } catch (Exception ex) {
+                  LOG.error("Failed to reload SSL keystore certificates", ex);
+                }
+              },null),
+          reloadInterval,
+          reloadInterval
+      );
+      return timer;
+    }
+
     private void setEnabledProtocols(SslContextFactory sslContextFactory) {
       String enabledProtocols = conf.get(SSLFactory.SSL_ENABLED_PROTOCOLS_KEY,
           SSLFactory.SSL_ENABLED_PROTOCOLS_DEFAULT);
@@ -617,6 +658,7 @@ public final class HttpServer2 implements FilterContainer {
     this.webAppContext = createWebAppContext(b, adminsAcl, appDir);
     this.xFrameOptionIsEnabled = b.xFrameEnabled;
     this.xFrameOption = b.xFrameOption;
+    this.configurationChangeMonitor = b.configurationChangeMonitor;
 
     try {
       this.secretProvider =
@@ -1384,6 +1426,16 @@ public final class HttpServer2 implements FilterContainer {
    */
   public void stop() throws Exception {
     MultiException exception = null;
+    if (this.configurationChangeMonitor.isPresent()) {
+      try {
+        this.configurationChangeMonitor.get().cancel();
+      } catch (Exception e) {
+        LOG.error(
+            "Error while canceling configuration monitoring timer for webapp"
+                + webAppContext.getDisplayName(), e);
+        exception = addMultiException(exception, e);
+      }
+    }
     for (ServerConnector c : listeners) {
       try {
         c.close();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
index b184e4a..457f63a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java
@@ -29,20 +29,20 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManager;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.text.MessageFormat;
+import java.util.Timer;
 
 /**
  * {@link KeyStoresFactory} implementation that reads the certificates from
  * keystore files.
  * <p>
- * if the trust certificates keystore file changes, the {@link TrustManager}
- * is refreshed with the new trust certificate entries (using a
- * {@link ReloadingX509TrustManager} trustmanager).
+ * If either the truststore or the keystore certificates file changes, it
+ * would be refreshed under the corresponding wrapper implementation -
+ * {@link ReloadingX509KeystoreManager} or {@link ReloadingX509TrustManager}.
+ * </p>
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -51,6 +51,13 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   private static final Logger LOG =
       LoggerFactory.getLogger(FileBasedKeyStoresFactory.class);
 
+  /**
+   * The refresh interval used to check if either of the truststore or keystore
+   * certificate file has changed.
+   */
+  public static final String SSL_STORES_RELOAD_INTERVAL_TPL_KEY =
+    "ssl.{0}.stores.reload.interval";
+
   public static final String SSL_KEYSTORE_LOCATION_TPL_KEY =
     "ssl.{0}.keystore.location";
   public static final String SSL_KEYSTORE_PASSWORD_TPL_KEY =
@@ -77,14 +84,119 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   public static final String DEFAULT_KEYSTORE_TYPE = "jks";
 
   /**
-   * Reload interval in milliseconds.
+   * The default time interval in milliseconds used to check if either
+   * of the truststore or keystore certificates file has changed and needs reloading.
    */
-  public static final int DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL = 10000;
+  public static final int DEFAULT_SSL_STORES_RELOAD_INTERVAL = 10000;
 
   private Configuration conf;
   private KeyManager[] keyManagers;
   private TrustManager[] trustManagers;
   private ReloadingX509TrustManager trustManager;
+  private Timer fileMonitoringTimer;
+
+
+  private void createTrustManagersFromConfiguration(SSLFactory.Mode mode,
+                                                    String truststoreType,
+                                                    String truststoreLocation,
+                                                    long storesReloadInterval)
+      throws IOException, GeneralSecurityException {
+    String passwordProperty = resolvePropertyName(mode,
+        SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
+    String truststorePassword = getPassword(conf, passwordProperty, "");
+    if (truststorePassword.isEmpty()) {
+      // An empty trust store password is legal; the trust store password
+      // is only required when writing to a trust store. Otherwise it's
+      // an optional integrity check.
+      truststorePassword = null;
+    }
+
+    // Check if obsolete truststore specific reload interval is present for backward compatible
+    long truststoreReloadInterval =
+        conf.getLong(
+            resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
+            storesReloadInterval);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation +
+          ", reloading at " + truststoreReloadInterval + " millis.");
+    }
+
+    trustManager = new ReloadingX509TrustManager(
+        truststoreType,
+        truststoreLocation,
+        truststorePassword);
+
+    if (truststoreReloadInterval > 0) {
+      fileMonitoringTimer.schedule(
+          new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation),
+              path -> trustManager.loadFrom(path),
+              exception -> LOG.error(ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE, exception)),
+          truststoreReloadInterval,
+          truststoreReloadInterval);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
+    }
+    trustManagers = new TrustManager[]{trustManager};
+  }
+
+  /**
+   * Implements logic of initializing the KeyManagers with the options
+   * to reload keystores.
+   * @param mode client or server
+   * @param keystoreType The keystore type.
+   * @param storesReloadInterval The interval to check if the keystore certificates
+   *                             file has changed.
+   */
+  private void createKeyManagersFromConfiguration(SSLFactory.Mode mode,
+                                                  String keystoreType, long storesReloadInterval)
+      throws GeneralSecurityException, IOException {
+    String locationProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
+    String keystoreLocation = conf.get(locationProperty, "");
+    if (keystoreLocation.isEmpty()) {
+      throw new GeneralSecurityException("The property '" + locationProperty +
+          "' has not been set in the ssl configuration file.");
+    }
+    String passwordProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
+    String keystorePassword = getPassword(conf, passwordProperty, "");
+    if (keystorePassword.isEmpty()) {
+      throw new GeneralSecurityException("The property '" + passwordProperty +
+          "' has not been set in the ssl configuration file.");
+    }
+    String keyPasswordProperty =
+        resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
+    // Key password defaults to the same value as store password for
+    // compatibility with legacy configurations that did not use a separate
+    // configuration property for key password.
+    String keystoreKeyPassword = getPassword(
+        conf, keyPasswordProperty, keystorePassword);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
+    }
+
+    ReloadingX509KeystoreManager keystoreManager =  new ReloadingX509KeystoreManager(
+        keystoreType,
+        keystoreLocation,
+        keystorePassword,
+        keystoreKeyPassword);
+
+    if (storesReloadInterval > 0) {
+      fileMonitoringTimer.schedule(
+          new FileMonitoringTimerTask(
+              Paths.get(keystoreLocation),
+              path -> keystoreManager.loadFrom(path),
+              exception -> LOG.error(ReloadingX509KeystoreManager.RELOAD_ERROR_MESSAGE, exception)),
+          storesReloadInterval,
+          storesReloadInterval);
+    }
+
+    keyManagers = new KeyManager[] { keystoreManager };
+  }
 
   /**
    * Resolves a property name to its client/server version if applicable.
@@ -139,56 +251,28 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
       conf.getBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY,
           SSLFactory.SSL_REQUIRE_CLIENT_CERT_DEFAULT);
 
+    long storesReloadInterval = conf.getLong(
+        resolvePropertyName(mode, SSL_STORES_RELOAD_INTERVAL_TPL_KEY),
+        DEFAULT_SSL_STORES_RELOAD_INTERVAL);
+
+    fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
+
     // certificate store
     String keystoreType =
-      conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
-               DEFAULT_KEYSTORE_TYPE);
-    KeyStore keystore = KeyStore.getInstance(keystoreType);
-    String keystoreKeyPassword = null;
-    if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
-      String locationProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
-      String keystoreLocation = conf.get(locationProperty, "");
-      if (keystoreLocation.isEmpty()) {
-        throw new GeneralSecurityException("The property '" + locationProperty +
-          "' has not been set in the ssl configuration file.");
-      }
-      String passwordProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
-      String keystorePassword = getPassword(conf, passwordProperty, "");
-      if (keystorePassword.isEmpty()) {
-        throw new GeneralSecurityException("The property '" + passwordProperty +
-          "' has not been set in the ssl configuration file.");
-      }
-      String keyPasswordProperty =
-        resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
-      // Key password defaults to the same value as store password for
-      // compatibility with legacy configurations that did not use a separate
-      // configuration property for key password.
-      keystoreKeyPassword = getPassword(
-          conf, keyPasswordProperty, keystorePassword);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
-      }
+        conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
+                 DEFAULT_KEYSTORE_TYPE);
 
-      InputStream is = Files.newInputStream(Paths.get(keystoreLocation));
-      try {
-        keystore.load(is, keystorePassword.toCharArray());
-      } finally {
-        is.close();
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " Loaded KeyStore: " + keystoreLocation);
-      }
+    if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
+      createKeyManagersFromConfiguration(mode, keystoreType, storesReloadInterval);
     } else {
+      KeyStore keystore = KeyStore.getInstance(keystoreType);
       keystore.load(null, null);
+      KeyManagerFactory keyMgrFactory = KeyManagerFactory
+              .getInstance(SSLFactory.SSLCERTIFICATE);
+
+      keyMgrFactory.init(keystore, null);
+      keyManagers = keyMgrFactory.getKeyManagers();
     }
-    KeyManagerFactory keyMgrFactory = KeyManagerFactory
-        .getInstance(SSLFactory.SSLCERTIFICATE);
-      
-    keyMgrFactory.init(keystore, (keystoreKeyPassword != null) ?
-                                 keystoreKeyPassword.toCharArray() : null);
-    keyManagers = keyMgrFactory.getKeyManagers();
 
     //trust store
     String truststoreType =
@@ -199,33 +283,7 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
       resolvePropertyName(mode, SSL_TRUSTSTORE_LOCATION_TPL_KEY);
     String truststoreLocation = conf.get(locationProperty, "");
     if (!truststoreLocation.isEmpty()) {
-      String passwordProperty = resolvePropertyName(mode,
-          SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
-      String truststorePassword = getPassword(conf, passwordProperty, "");
-      if (truststorePassword.isEmpty()) {
-        // An empty trust store password is legal; the trust store password
-        // is only required when writing to a trust store. Otherwise it's
-        // an optional integrity check.
-        truststorePassword = null;
-      }
-      long truststoreReloadInterval =
-          conf.getLong(
-              resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
-              DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation);
-      }
-
-      trustManager = new ReloadingX509TrustManager(truststoreType,
-          truststoreLocation,
-          truststorePassword,
-          truststoreReloadInterval);
-      trustManager.init();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
-      }
-      trustManagers = new TrustManager[]{trustManager};
+      createTrustManagersFromConfiguration(mode, truststoreType, truststoreLocation, storesReloadInterval);
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("The property '" + locationProperty + "' has not been set, " +
@@ -256,7 +314,7 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
   @Override
   public synchronized void destroy() {
     if (trustManager != null) {
-      trustManager.destroy();
+      fileMonitoringTimer.cancel();
       trustManager = null;
       keyManagers = null;
       trustManagers = null;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java
new file mode 100644
index 0000000..40b6197
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileMonitoringTimerTask.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.ssl;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.TimerTask;
+import java.util.function.Consumer;
+
+/**
+ * Implements basic logic to track when a file changes on disk and call the action
+ * passed to the constructor when it does. An exception handler can optionally also be specified
+ * in the constructor, otherwise any exception occurring during process will be logged
+ * using this class' logger.
+ */
+@InterfaceAudience.Private
+public class FileMonitoringTimerTask extends TimerTask {
+
+  static final Logger LOG = LoggerFactory.getLogger(FileMonitoringTimerTask.class);
+
+  @VisibleForTesting
+  static final String PROCESS_ERROR_MESSAGE =
+      "Could not process file change : ";
+
+  final private Path filePath;
+  final private Consumer<Path> onFileChange;
+  final Consumer<Throwable> onChangeFailure;
+  private long lastProcessed;
+
+  /**
+   * Create file monitoring task to be scheduled using a standard Java {@link java.util.Timer}
+   * instance.
+   *
+   * @param filePath The path to the file to monitor.
+   * @param onFileChange The function to call when the file has changed.
+   * @param onChangeFailure The function to call when an exception is thrown during the
+   *                       file change processing.
+   */
+  public FileMonitoringTimerTask(Path filePath, Consumer<Path> onFileChange,
+      Consumer<Throwable> onChangeFailure) {
+    Preconditions.checkNotNull(filePath, "path to monitor disk file is not set");
+    Preconditions.checkNotNull(onFileChange, "action to monitor disk file is not set");
+
+    this.filePath = filePath;
+    this.lastProcessed = filePath.toFile().lastModified();
+    this.onFileChange = onFileChange;
+    this.onChangeFailure = onChangeFailure;
+  }
+
+  @Override
+  public void run() {
+    if (lastProcessed != filePath.toFile().lastModified()) {
+      try {
+        onFileChange.accept(filePath);
+      } catch (Throwable t) {
+        if (onChangeFailure  != null) {
+          onChangeFailure.accept(t);
+        } else {
+          LOG.error(PROCESS_ERROR_MESSAGE + filePath.toString(), t);
+        }
+      }
+      lastProcessed = filePath.toFile().lastModified();
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java
new file mode 100644
index 0000000..72e8b6b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509KeystoreManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.ssl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An implementation of <code>X509KeyManager</code> that exposes a method,
+ * {@link #loadFrom(Path)} to reload its configuration. Note that it is necessary
+ * to implement the <code>X509ExtendedKeyManager</code> to properly delegate
+ * the additional methods, otherwise the SSL handshake will fail.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReloadingX509KeystoreManager extends X509ExtendedKeyManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
+
+  static final String RELOAD_ERROR_MESSAGE =
+      "Could not load keystore (keep using existing one) : ";
+
+  final private String type;
+  final private String storePassword;
+  final private String keyPassword;
+  private AtomicReference<X509ExtendedKeyManager> keyManagerRef;
+
+  /**
+   * Construct a <code>Reloading509KeystoreManager</code>
+   *
+   * @param type type of keystore file, typically 'jks'.
+   * @param location local path to the keystore file.
+   * @param storePassword password of the keystore file.
+   * @param keyPassword The password of the key.
+   * @throws IOException
+   * @throws GeneralSecurityException
+   */
+  public ReloadingX509KeystoreManager(String type, String location,
+                                      String storePassword, String keyPassword)
+      throws IOException, GeneralSecurityException {
+    this.type = type;
+    this.storePassword = storePassword;
+    this.keyPassword = keyPassword;
+    keyManagerRef = new AtomicReference<X509ExtendedKeyManager>();
+    keyManagerRef.set(loadKeyManager(Paths.get(location)));
+  }
+
+  @Override
+  public String chooseEngineClientAlias(String[] strings, Principal[] principals,
+      SSLEngine sslEngine) {
+    return keyManagerRef.get().chooseEngineClientAlias(strings, principals, sslEngine);
+  }
+
+  @Override
+  public String chooseEngineServerAlias(String s, Principal[] principals,
+      SSLEngine sslEngine) {
+    return keyManagerRef.get().chooseEngineServerAlias(s, principals, sslEngine);
+  }
+
+  @Override
+  public String[] getClientAliases(String s, Principal[] principals) {
+    return keyManagerRef.get().getClientAliases(s, principals);
+  }
+
+  @Override
+  public String chooseClientAlias(String[] strings, Principal[] principals,
+      Socket socket) {
+    return keyManagerRef.get().chooseClientAlias(strings, principals, socket);
+  }
+
+  @Override
+  public String[] getServerAliases(String s, Principal[] principals) {
+    return keyManagerRef.get().getServerAliases(s, principals);
+  }
+
+  @Override
+  public String chooseServerAlias(String s, Principal[] principals,
+      Socket socket) {
+    return keyManagerRef.get().chooseServerAlias(s, principals, socket);
+  }
+
+  @Override
+  public X509Certificate[] getCertificateChain(String s) {
+    return keyManagerRef.get().getCertificateChain(s);
+  }
+
+  @Override
+  public PrivateKey getPrivateKey(String s) {
+    return keyManagerRef.get().getPrivateKey(s);
+  }
+
+  public ReloadingX509KeystoreManager loadFrom(Path path) {
+    try {
+      this.keyManagerRef.set(loadKeyManager(path));
+    } catch (Exception ex) {
+      // The Consumer.accept interface forces us to convert to unchecked
+      throw new RuntimeException(ex);
+    }
+    return this;
+  }
+
+  private X509ExtendedKeyManager loadKeyManager(Path path)
+      throws IOException, GeneralSecurityException {
+
+    X509ExtendedKeyManager keyManager = null;
+    KeyStore keystore = KeyStore.getInstance(type);
+
+    try (InputStream is = Files.newInputStream(path)) {
+      keystore.load(is, this.storePassword.toCharArray());
+    }
+
+    LOG.debug(" Loaded KeyStore: " + path.toFile().getAbsolutePath());
+
+    KeyManagerFactory keyMgrFactory = KeyManagerFactory.getInstance(
+        SSLFactory.SSLCERTIFICATE);
+    keyMgrFactory.init(keystore,
+        (keyPassword != null) ? keyPassword.toCharArray() : null);
+    for (KeyManager candidate: keyMgrFactory.getKeyManagers()) {
+      if (candidate instanceof X509ExtendedKeyManager) {
+        keyManager = (X509ExtendedKeyManager)candidate;
+        break;
+      }
+    }
+    return keyManager;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
index 7430477..68fd4c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/ReloadingX509TrustManager.java
@@ -32,6 +32,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.cert.CertificateException;
@@ -39,31 +41,23 @@ import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A {@link TrustManager} implementation that reloads its configuration when
- * the truststore file on disk changes.
+ * A {@link TrustManager} implementation that exposes a method, {@link #loadFrom(Path)}
+ * to reload its configuration for example when the truststore file on disk changes.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public final class ReloadingX509TrustManager
-  implements X509TrustManager, Runnable {
+public final class ReloadingX509TrustManager implements X509TrustManager {
 
-  @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(ReloadingX509TrustManager.class);
-  @VisibleForTesting
+
   static final String RELOAD_ERROR_MESSAGE =
       "Could not load truststore (keep using existing one) : ";
 
   private String type;
-  private File file;
   private String password;
-  private long lastLoaded;
-  private long reloadInterval;
   private AtomicReference<X509TrustManager> trustManagerRef;
 
-  private volatile boolean running;
-  private Thread reloader;
-
   /**
    * Creates a reloadable trustmanager. The trustmanager reloads itself
    * if the underlying trustore file has changed.
@@ -71,49 +65,18 @@ public final class ReloadingX509TrustManager
    * @param type type of truststore file, typically 'jks'.
    * @param location local path to the truststore file.
    * @param password password of the truststore file.
-   * @param reloadInterval interval to check if the truststore file has
    * changed, in milliseconds.
    * @throws IOException thrown if the truststore could not be initialized due
    * to an IO error.
    * @throws GeneralSecurityException thrown if the truststore could not be
    * initialized due to a security error.
    */
-  public ReloadingX509TrustManager(String type, String location,
-                                   String password, long reloadInterval)
+  public ReloadingX509TrustManager(String type, String location, String password)
     throws IOException, GeneralSecurityException {
     this.type = type;
-    file = new File(location);
     this.password = password;
     trustManagerRef = new AtomicReference<X509TrustManager>();
-    trustManagerRef.set(loadTrustManager());
-    this.reloadInterval = reloadInterval;
-  }
-
-  /**
-   * Starts the reloader thread.
-   */
-  public void init() {
-    reloader = new Thread(this, "Truststore reloader thread");
-    reloader.setDaemon(true);
-    running =  true;
-    reloader.start();
-  }
-
-  /**
-   * Stops the reloader thread.
-   */
-  public void destroy() {
-    running = false;
-    reloader.interrupt();
-  }
-
-  /**
-   * Returns the reload check interval.
-   *
-   * @return the reload check interval, in milliseconds.
-   */
-  public long getReloadInterval() {
-    return reloadInterval;
+    trustManagerRef.set(loadTrustManager(Paths.get(location)));
   }
 
   @Override
@@ -151,27 +114,24 @@ public final class ReloadingX509TrustManager
     return issuers;
   }
 
-  boolean needsReload() {
-    boolean reload = true;
-    if (file.exists()) {
-      if (file.lastModified() == lastLoaded) {
-        reload = false;
-      }
-    } else {
-      lastLoaded = 0;
+  public ReloadingX509TrustManager loadFrom(Path path) {
+    try {
+      this.trustManagerRef.set(loadTrustManager(path));
+    } catch (Exception ex) {
+      // The Consumer.accept interface forces us to convert to unchecked
+      throw new RuntimeException(RELOAD_ERROR_MESSAGE, ex);
     }
-    return reload;
+    return this;
   }
 
-  X509TrustManager loadTrustManager()
+  X509TrustManager loadTrustManager(Path path)
   throws IOException, GeneralSecurityException {
     X509TrustManager trustManager = null;
     KeyStore ks = KeyStore.getInstance(type);
-    InputStream in = Files.newInputStream(file.toPath());
+    InputStream in = Files.newInputStream(path);
     try {
       ks.load(in, (password == null) ? null : password.toCharArray());
-      lastLoaded = file.lastModified();
-      LOG.debug("Loaded truststore '" + file + "'");
+      LOG.debug("Loaded truststore '" + path + "'");
     } finally {
       in.close();
     }
@@ -188,23 +148,4 @@ public final class ReloadingX509TrustManager
     }
     return trustManager;
   }
-
-  @Override
-  public void run() {
-    while (running) {
-      try {
-        Thread.sleep(reloadInterval);
-      } catch (InterruptedException e) {
-        //NOP
-      }
-      if (running && needsReload()) {
-        try {
-          trustManagerRef.set(loadTrustManager());
-        } catch (Exception ex) {
-          LOG.warn(RELOAD_ERROR_MESSAGE + ex.toString(), ex);
-        }
-      }
-    }
-  }
-
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java
new file mode 100644
index 0000000..7561ef0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509KeyManager.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.ssl;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Supplier;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
+import java.util.Timer;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.security.ssl.KeyStoreTestUtil.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestReloadingX509KeyManager {
+
+    private static final String BASEDIR = GenericTestUtils.getTempPath(
+            TestReloadingX509TrustManager.class.getSimpleName());
+
+    private final GenericTestUtils.LogCapturer reloaderLog = GenericTestUtils.LogCapturer.captureLogs(
+            FileMonitoringTimerTask.LOG);
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File base = new File(BASEDIR);
+        FileUtil.fullyDelete(base);
+        base.mkdirs();
+    }
+
+    @Test(expected = IOException.class)
+    public void testLoadMissingKeyStore() throws Exception {
+        String keystoreLocation = BASEDIR + "/testmissing.jks";
+
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+    }
+
+    @Test(expected = IOException.class)
+    public void testLoadCorruptKeyStore() throws Exception {
+        String keystoreLocation = BASEDIR + "/testcorrupt.jks";
+        OutputStream os = new FileOutputStream(keystoreLocation);
+        os.write(1);
+        os.close();
+
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+    }
+
+    @Test (timeout = 3000000)
+    public void testReload() throws Exception {
+        KeyPair kp = generateKeyPair("RSA");
+        X509Certificate sCert = generateCertificate("CN=localhost, O=server", kp, 30,
+                        "SHA1withRSA");
+        String keystoreLocation = BASEDIR + "/testreload.jks";
+        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), sCert);
+
+        long reloadInterval = 10;
+        Timer fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+        try {
+            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+
+            // Wait so that the file modification time is different
+            Thread.sleep((reloadInterval+ 1000));
+
+            // Change the certificate with a new keypair
+            final KeyPair anotherKP = generateKeyPair("RSA");
+            sCert = KeyStoreTestUtil.generateCertificate("CN=localhost, O=server", anotherKP, 30,
+                            "SHA1withRSA");
+            createKeyStore(keystoreLocation, "password", "cert1", anotherKP.getPrivate(), sCert);
+
+            GenericTestUtils.waitFor(new Supplier<Boolean>() {
+                @Override
+                public Boolean get() {
+                    return tm.getPrivateKey("cert1").equals(kp.getPrivate());
+                }
+            }, (int) reloadInterval, 100000);
+        } finally {
+            fileMonitoringTimer.cancel();
+        }
+    }
+
+    @Test (timeout = 30000)
+    public void testReloadMissingTrustStore() throws Exception {
+        KeyPair kp = generateKeyPair("RSA");
+        X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+        String keystoreLocation = BASEDIR + "/testmissing.jks";
+        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
+
+        long reloadInterval = 10;
+        Timer fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+        try {
+            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+
+            assertFalse(reloaderLog.getOutput().contains(
+                    FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
+
+            // Wait for the first reload to happen so we actually detect a change after the delete
+            Thread.sleep((reloadInterval+ 1000));
+
+            new File(keystoreLocation).delete();
+
+            // Wait for the reload to happen and log to get written to
+            Thread.sleep((reloadInterval+ 1000));
+
+            waitForFailedReloadAtLeastOnce((int) reloadInterval);
+
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+        } finally {
+            reloaderLog.stopCapturing();
+            fileMonitoringTimer.cancel();
+        }
+    }
+
+
+    @Test (timeout = 30000)
+    public void testReloadCorruptTrustStore() throws Exception {
+        KeyPair kp = generateKeyPair("RSA");
+        X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+        String keystoreLocation = BASEDIR + "/testmissing.jks";
+        createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
+
+        long reloadInterval = 10;
+        Timer fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
+        ReloadingX509KeystoreManager tm =
+                new ReloadingX509KeystoreManager("jks", keystoreLocation,
+                        "password",
+                        "password");
+        try {
+            fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+                    Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+
+            // Wait so that the file modification time is different
+            Thread.sleep((reloadInterval + 1000));
+
+            assertFalse(reloaderLog.getOutput().contains(
+                    FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
+            OutputStream os = new FileOutputStream(keystoreLocation);
+            os.write(1);
+            os.close();
+
+            waitForFailedReloadAtLeastOnce((int) reloadInterval);
+
+            assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
+        } finally {
+            reloaderLog.stopCapturing();
+            fileMonitoringTimer.cancel();
+        }
+    }
+
+    /**Wait for the reloader thread to load the configurations at least once
+     * by probing the log of the thread if the reload fails.
+     */
+    private void waitForFailedReloadAtLeastOnce(int reloadInterval)
+            throws InterruptedException, TimeoutException {
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+                return reloaderLog.getOutput().contains(
+                        FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
+            }
+        }, reloadInterval, 10 * 1000);
+    }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
index 441f552..7c2f065 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestReloadingX509TrustManager.java
@@ -30,10 +30,12 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Paths;
 import java.security.KeyPair;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
@@ -50,7 +52,7 @@ public class TestReloadingX509TrustManager {
   private X509Certificate cert1;
   private X509Certificate cert2;
   private final LogCapturer reloaderLog = LogCapturer.captureLogs(
-      ReloadingX509TrustManager.LOG);
+      FileMonitoringTimerTask.LOG);
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -64,12 +66,7 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testmissing.jks";
 
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
-    try {
-      tm.init();
-    } finally {
-      tm.destroy();
-    }
+            new ReloadingX509TrustManager("jks", truststoreLocation, "password");
   }
 
   @Test(expected = IOException.class)
@@ -80,12 +77,7 @@ public class TestReloadingX509TrustManager {
     os.close();
 
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
-    try {
-      tm.init();
-    } finally {
-      tm.destroy();
-    }
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
   }
 
   @Test (timeout = 30000)
@@ -96,14 +88,17 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testreload.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    long reloadInterval = 10;
+    Timer fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
     final ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
       assertEquals(1, tm.getAcceptedIssuers().length);
 
       // Wait so that the file modification time is different
-      Thread.sleep((tm.getReloadInterval() + 1000));
+      Thread.sleep((reloadInterval+ 1000));
 
       // Add another cert
       Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
@@ -116,9 +111,9 @@ public class TestReloadingX509TrustManager {
         public Boolean get() {
           return tm.getAcceptedIssuers().length == 2;
         }
-      }, (int) tm.getReloadInterval(), 10000);
+      }, (int) reloadInterval, 100000);
     } finally {
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 
@@ -130,27 +125,38 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testmissing.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    long reloadInterval = 10;
+    Timer fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
       assertEquals(1, tm.getAcceptedIssuers().length);
       X509Certificate cert = tm.getAcceptedIssuers()[0];
 
       assertFalse(reloaderLog.getOutput().contains(
-          ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
+              FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
+
+      // Wait for the first reload to happen so we actually detect a change after the delete
+      Thread.sleep((reloadInterval+ 1000));
+
       new File(truststoreLocation).delete();
 
-      waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
+      // Wait for the reload to happen and log to get written to
+      Thread.sleep((reloadInterval+ 1000));
+
+      waitForFailedReloadAtLeastOnce((int) reloadInterval);
 
       assertEquals(1, tm.getAcceptedIssuers().length);
       assertEquals(cert, tm.getAcceptedIssuers()[0]);
     } finally {
       reloaderLog.stopCapturing();
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 
+
   @Test (timeout = 30000)
   public void testReloadCorruptTrustStore() throws Exception {
     KeyPair kp = generateKeyPair("RSA");
@@ -159,29 +165,32 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testcorrupt.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    long reloadInterval = 10;
+    Timer fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
     ReloadingX509TrustManager tm =
-      new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
+      new ReloadingX509TrustManager("jks", truststoreLocation, "password");
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
       assertEquals(1, tm.getAcceptedIssuers().length);
       final X509Certificate cert = tm.getAcceptedIssuers()[0];
 
       // Wait so that the file modification time is different
-      Thread.sleep((tm.getReloadInterval() + 1000));
+      Thread.sleep((reloadInterval + 1000));
 
       assertFalse(reloaderLog.getOutput().contains(
-          ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
+              FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
       OutputStream os = new FileOutputStream(truststoreLocation);
       os.write(1);
       os.close();
 
-      waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
+      waitForFailedReloadAtLeastOnce((int) reloadInterval);
 
       assertEquals(1, tm.getAcceptedIssuers().length);
       assertEquals(cert, tm.getAcceptedIssuers()[0]);
     } finally {
       reloaderLog.stopCapturing();
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 
@@ -194,7 +203,7 @@ public class TestReloadingX509TrustManager {
       @Override
       public Boolean get() {
         return reloaderLog.getOutput().contains(
-            ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE);
+            FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
       }
     }, reloadInterval, 10 * 1000);
   }
@@ -208,13 +217,15 @@ public class TestReloadingX509TrustManager {
     String truststoreLocation = BASEDIR + "/testreload.jks";
     createTrustStore(truststoreLocation, "password", "cert1", cert1);
 
+    Timer fileMonitoringTimer = new Timer("SSL Certificates Store Monitor", true);
     final ReloadingX509TrustManager tm =
-        new ReloadingX509TrustManager("jks", truststoreLocation, null, 10);
+        new ReloadingX509TrustManager("jks", truststoreLocation, null);
     try {
-      tm.init();
+      fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
+              Paths.get(truststoreLocation), tm::loadFrom,null), 10, 10);
       assertEquals(1, tm.getAcceptedIssuers().length);
     } finally {
-      tm.destroy();
+      fileMonitoringTimer.cancel();
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org