You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hasnain-db (via GitHub)" <gi...@apache.org> on 2023/10/06 15:53:32 UTC

[PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

hasnain-db opened a new pull request, #43249:
URL: https://github.com/apache/spark/pull/43249

   ### What changes were proposed in this pull request?
   
   This adds in support for trust store reloading, mirroring the Hadoop implementation (see source comments for a link). I believe reusing the existing code instead of adding a dependency is fine license wise (see https://github.com/apache/spark/pull/42685/files#r1333667328)
   
   ### Why are the changes needed?
   
   This helps us refresh trust stores without needing downtime
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Added unit tests (also copied from upstream)
   
   ```
   build/sbt
   > project network-common
   > testOnly org.apache.spark.network.ssl.ReloadingX509TrustManagerSuite
   ``` 
   
   The rest of the changes and integration were tested as part of https://github.com/apache/spark/pull/42685
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #43249: [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager
URL: https://github.com/apache/spark/pull/43249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on code in PR #43249:
URL: https://github.com/apache/spark/pull/43249#discussion_r1348906596


##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+

Review Comment:
   I'll fix the spacing here, didn't want to wait another extra cycle of CI before reviews.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43249:
URL: https://github.com/apache/spark/pull/43249#issuecomment-1751926927

   @mridulm thanks for the review here. I copied this code as is from Hadoop with no modifications, with the idea that that's well tested and used and we can avoid any risks from changing implementations. Happy to change the implementation here (and potentially upstream), but wanted to double check that that's what you prefer instead of keeping compatibility with a tested implementation. Please let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43249:
URL: https://github.com/apache/spark/pull/43249#issuecomment-1760643710

   The test failure does not look relevant, but can you retrigger tests please ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43249:
URL: https://github.com/apache/spark/pull/43249#issuecomment-1752146127

   If this is to be part of Apache Spark codebase, divergence is to be expected as code evolves and we add functionality relevant to Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43249:
URL: https://github.com/apache/spark/pull/43249#issuecomment-1752150779

   Thanks! Will address the comments and take a more detailed look in that case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43249:
URL: https://github.com/apache/spark/pull/43249#discussion_r1349604356


##########
common/network-common/src/main/java/org/apache/spark/network/ssl/ReloadingX509TrustManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link TrustManager} implementation that reloads its configuration when
+ * the truststore file on disk changes.
+ * This implementation is based entirely on the
+ * org.apache.hadoop.security.ssl.ReloadingX509TrustManager class in the Apache Hadoop Encrypted
+ * Shuffle implementation.
+ *
+ * @see <a href="https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Hadoop MapReduce Next Generation - Encrypted Shuffle</a>
+ */
+public final class ReloadingX509TrustManager
+        implements X509TrustManager, Runnable {
+
+  private final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
+
+  private String type;
+  private File file;
+  private String password;
+  private long lastLoaded;
+  private long reloadInterval;
+  private AtomicReference<X509TrustManager> trustManagerRef;
+
+  private volatile boolean running;
+  private Thread reloader;
+
+  /**
+   * Creates a reloadable trustmanager. The trustmanager reloads itself
+   * if the underlying trustore file has changed.
+   *
+   * @param type           type of truststore file, typically 'jks'.
+   * @param trustStore     the truststore file.
+   * @param password       password of the truststore file.
+   * @param reloadInterval interval to check if the truststore file has
+   *                       changed, in milliseconds.
+   * @throws IOException              thrown if the truststore could not be initialized due
+   *                                  to an IO error.
+   * @throws GeneralSecurityException thrown if the truststore could not be
+   *                                  initialized due to a security error.
+   */
+  public ReloadingX509TrustManager(
+      String type, File trustStore, String password, long reloadInterval)
+      throws IOException, GeneralSecurityException {
+    this.type = type;
+    file = trustStore;
+    this.password = password;
+    trustManagerRef = new AtomicReference<X509TrustManager>();
+    trustManagerRef.set(loadTrustManager());
+    this.reloadInterval = reloadInterval;
+  }
+
+  /**
+   * Starts the reloader thread.
+   */
+  public void init() {
+    reloader = new Thread(this, "Truststore reloader thread");
+    reloader.setDaemon(true);
+    running = true;
+    reloader.start();
+  }
+
+  /**
+   * Stops the reloader thread.
+   */
+  public void destroy() {
+    running = false;
+    reloader.interrupt();
+  }
+
+  /**
+   * Returns the reload check interval.
+   *
+   * @return the reload check interval, in milliseconds.
+   */
+  public long getReloadInterval() {
+    return reloadInterval;
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType)
+          throws CertificateException {
+    X509TrustManager tm = trustManagerRef.get();
+    if (tm != null) {
+      tm.checkClientTrusted(chain, authType);
+    } else {
+      throw new CertificateException("Unknown client chain certificate: " +
+        chain[0].toString());
+    }
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType)
+          throws CertificateException {
+    X509TrustManager tm = trustManagerRef.get();
+    if (tm != null) {
+      tm.checkServerTrusted(chain, authType);
+    } else {
+      throw new CertificateException("Unknown server chain certificate: " +
+              chain[0].toString());
+    }
+  }
+
+  private static final X509Certificate[] EMPTY = new X509Certificate[0];
+
+  @Override
+  public X509Certificate[] getAcceptedIssuers() {
+    X509Certificate[] issuers = EMPTY;
+    X509TrustManager tm = trustManagerRef.get();
+    if (tm != null) {
+      issuers = tm.getAcceptedIssuers();
+    }
+    return issuers;
+  }
+
+  boolean needsReload() {
+    boolean reload = true;
+    if (file.exists()) {
+      if (file.lastModified() == lastLoaded) {
+        reload = false;
+      }
+    } else {
+      lastLoaded = 0;
+    }
+    return reload;
+  }
+
+  X509TrustManager loadTrustManager()
+          throws IOException, GeneralSecurityException {
+    X509TrustManager trustManager = null;
+    KeyStore ks = KeyStore.getInstance(type);
+    lastLoaded = file.lastModified();
+    FileInputStream in = new FileInputStream(file);
+    try {

Review Comment:
   nit: Use try-with-resources instead of the try/finally here.



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  /**
+   * Tests to ensure that loading a missing trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadMissingTrustStore() throws Exception {
+    File trustStore = new File("testmissing.jks");
+    assertFalse(trustStore.exists());
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        trustStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * Tests to ensure that loading a corrupt trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadCorruptTrustStore() throws Exception {
+    File corruptStore = File.createTempFile("truststore-corrupt", "jks");
+    corruptStore.deleteOnExit();
+    OutputStream os = new FileOutputStream(corruptStore);
+    os.write(1);
+    os.close();
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        corruptStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test
+  public void testReload() throws Exception {
+    KeyPair kp = generateKeyPair("RSA");
+    X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+    X509Certificate cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA");
+    File trustStore = File.createTempFile("testreload", "jks");
+    trustStore.deleteOnExit();
+    createTrustStore(trustStore, "password", "cert1", cert1);
+
+    ReloadingX509TrustManager tm =
+      new ReloadingX509TrustManager("jks", trustStore, "password", 10);
+    try {
+      tm.init();
+      assertEquals(1, tm.getAcceptedIssuers().length);
+
+      // Wait so that the file modification time is different
+      Thread.sleep((tm.getReloadInterval() + 1000));
+
+      // Add another cert
+      Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+      certs.put("cert1", cert1);
+      certs.put("cert2", cert2);
+      createTrustStore(trustStore, "password", certs);
+
+      // and wait to be sure reload has taken place
+      assertEquals(10, tm.getReloadInterval());

Review Comment:
   nit: Move this to after creation of `tm` ?



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  /**
+   * Tests to ensure that loading a missing trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadMissingTrustStore() throws Exception {
+    File trustStore = new File("testmissing.jks");
+    assertFalse(trustStore.exists());
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        trustStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * Tests to ensure that loading a corrupt trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadCorruptTrustStore() throws Exception {
+    File corruptStore = File.createTempFile("truststore-corrupt", "jks");
+    corruptStore.deleteOnExit();
+    OutputStream os = new FileOutputStream(corruptStore);
+    os.write(1);
+    os.close();
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        corruptStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test
+  public void testReload() throws Exception {
+    KeyPair kp = generateKeyPair("RSA");
+    X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+    X509Certificate cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA");
+    File trustStore = File.createTempFile("testreload", "jks");
+    trustStore.deleteOnExit();
+    createTrustStore(trustStore, "password", "cert1", cert1);
+
+    ReloadingX509TrustManager tm =
+      new ReloadingX509TrustManager("jks", trustStore, "password", 10);
+    try {
+      tm.init();
+      assertEquals(1, tm.getAcceptedIssuers().length);
+
+      // Wait so that the file modification time is different
+      Thread.sleep((tm.getReloadInterval() + 1000));
+
+      // Add another cert
+      Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+      certs.put("cert1", cert1);
+      certs.put("cert2", cert2);
+      createTrustStore(trustStore, "password", certs);
+
+      // and wait to be sure reload has taken place
+      assertEquals(10, tm.getReloadInterval());
+
+      // Wait so that the file modification time is different
+      Thread.sleep((tm.getReloadInterval() + 200));

Review Comment:
   Instead of relying on timing, and potentially becoming flakey (in case reloader was delayed), I would suggest to add a counter to track the number of times reload happened - and wait on that to change.
   
   This also minimizes the arbitrary sleep values in various tests.



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  /**
+   * Tests to ensure that loading a missing trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadMissingTrustStore() throws Exception {
+    File trustStore = new File("testmissing.jks");
+    assertFalse(trustStore.exists());
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        trustStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * Tests to ensure that loading a corrupt trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadCorruptTrustStore() throws Exception {
+    File corruptStore = File.createTempFile("truststore-corrupt", "jks");
+    corruptStore.deleteOnExit();
+    OutputStream os = new FileOutputStream(corruptStore);
+    os.write(1);
+    os.close();
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        corruptStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test
+  public void testReload() throws Exception {
+    KeyPair kp = generateKeyPair("RSA");
+    X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+    X509Certificate cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA");
+    File trustStore = File.createTempFile("testreload", "jks");
+    trustStore.deleteOnExit();
+    createTrustStore(trustStore, "password", "cert1", cert1);
+
+    ReloadingX509TrustManager tm =
+      new ReloadingX509TrustManager("jks", trustStore, "password", 10);
+    try {
+      tm.init();
+      assertEquals(1, tm.getAcceptedIssuers().length);
+
+      // Wait so that the file modification time is different
+      Thread.sleep((tm.getReloadInterval() + 1000));
+
+      // Add another cert
+      Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+      certs.put("cert1", cert1);
+      certs.put("cert2", cert2);
+      createTrustStore(trustStore, "password", certs);

Review Comment:
   `saveKeyStore`, which `createTrustStore` relies on, is directly overwriting the specified file.
   This can make things flakey in case reloader happened to read while it is writing to the file.
   
   Either we should make this atomic, or ensure we wait on truststore creation to complete before reloading (have hooks which we extend in the tests to enforce wait).



##########
common/network-common/src/main/java/org/apache/spark/network/ssl/ReloadingX509TrustManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link TrustManager} implementation that reloads its configuration when
+ * the truststore file on disk changes.
+ * This implementation is based entirely on the
+ * org.apache.hadoop.security.ssl.ReloadingX509TrustManager class in the Apache Hadoop Encrypted
+ * Shuffle implementation.
+ *
+ * @see <a href="https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Hadoop MapReduce Next Generation - Encrypted Shuffle</a>
+ */
+public final class ReloadingX509TrustManager
+        implements X509TrustManager, Runnable {
+
+  private final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
+
+  private String type;
+  private File file;
+  private String password;
+  private long lastLoaded;
+  private long reloadInterval;
+  private AtomicReference<X509TrustManager> trustManagerRef;
+
+  private volatile boolean running;
+  private Thread reloader;

Review Comment:
   nit: Make the immutable variables `final` ?



##########
common/network-common/src/main/java/org/apache/spark/network/ssl/ReloadingX509TrustManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link TrustManager} implementation that reloads its configuration when
+ * the truststore file on disk changes.
+ * This implementation is based entirely on the
+ * org.apache.hadoop.security.ssl.ReloadingX509TrustManager class in the Apache Hadoop Encrypted
+ * Shuffle implementation.
+ *
+ * @see <a href="https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Hadoop MapReduce Next Generation - Encrypted Shuffle</a>
+ */
+public final class ReloadingX509TrustManager
+        implements X509TrustManager, Runnable {
+
+  private final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
+
+  private String type;
+  private File file;
+  private String password;
+  private long lastLoaded;
+  private long reloadInterval;
+  private AtomicReference<X509TrustManager> trustManagerRef;
+
+  private volatile boolean running;
+  private Thread reloader;
+
+  /**
+   * Creates a reloadable trustmanager. The trustmanager reloads itself
+   * if the underlying trustore file has changed.
+   *
+   * @param type           type of truststore file, typically 'jks'.
+   * @param trustStore     the truststore file.
+   * @param password       password of the truststore file.
+   * @param reloadInterval interval to check if the truststore file has
+   *                       changed, in milliseconds.
+   * @throws IOException              thrown if the truststore could not be initialized due
+   *                                  to an IO error.
+   * @throws GeneralSecurityException thrown if the truststore could not be
+   *                                  initialized due to a security error.
+   */
+  public ReloadingX509TrustManager(
+      String type, File trustStore, String password, long reloadInterval)
+      throws IOException, GeneralSecurityException {
+    this.type = type;
+    file = trustStore;
+    this.password = password;
+    trustManagerRef = new AtomicReference<X509TrustManager>();
+    trustManagerRef.set(loadTrustManager());
+    this.reloadInterval = reloadInterval;
+  }
+
+  /**
+   * Starts the reloader thread.
+   */
+  public void init() {
+    reloader = new Thread(this, "Truststore reloader thread");
+    reloader.setDaemon(true);
+    running = true;
+    reloader.start();
+  }
+
+  /**
+   * Stops the reloader thread.
+   */
+  public void destroy() {
+    running = false;
+    reloader.interrupt();
+  }
+
+  /**
+   * Returns the reload check interval.
+   *
+   * @return the reload check interval, in milliseconds.
+   */
+  public long getReloadInterval() {
+    return reloadInterval;
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType)
+          throws CertificateException {
+    X509TrustManager tm = trustManagerRef.get();
+    if (tm != null) {
+      tm.checkClientTrusted(chain, authType);
+    } else {
+      throw new CertificateException("Unknown client chain certificate: " +
+        chain[0].toString());
+    }
+  }
+
+  @Override
+  public void checkServerTrusted(X509Certificate[] chain, String authType)
+          throws CertificateException {
+    X509TrustManager tm = trustManagerRef.get();
+    if (tm != null) {
+      tm.checkServerTrusted(chain, authType);
+    } else {
+      throw new CertificateException("Unknown server chain certificate: " +
+              chain[0].toString());
+    }
+  }
+
+  private static final X509Certificate[] EMPTY = new X509Certificate[0];
+
+  @Override
+  public X509Certificate[] getAcceptedIssuers() {
+    X509Certificate[] issuers = EMPTY;
+    X509TrustManager tm = trustManagerRef.get();
+    if (tm != null) {
+      issuers = tm.getAcceptedIssuers();
+    }
+    return issuers;
+  }
+
+  boolean needsReload() {
+    boolean reload = true;
+    if (file.exists()) {
+      if (file.lastModified() == lastLoaded) {

Review Comment:
   In general, we can have two cases:
   1. `file` is an actual file - and gets updated with new contents.
   1. `file` is a symbolic link, and there are two ways in which trust store is being updated:
      1. Underlying file is being update with a few version.
      1. Symbolic file is being pointed to a new file.
   
   This method works fine for the first case.
   For symbolic link, this would be implementation dependent on what `lastModified` returns: for the file or the actual file it is pointing to.
   We can handle this in two ways:
   
   a) Keep track of the canonical file which `file` is pointing to when last checked.
   b) in `needsReload`, we dont need to reload in case `file` is pointing to same canonical file and last modified of the canonical file has not changed.
   
   Thoughts ?
   



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  /**
+   * Tests to ensure that loading a missing trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadMissingTrustStore() throws Exception {
+    File trustStore = new File("testmissing.jks");
+    assertFalse(trustStore.exists());
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        trustStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * Tests to ensure that loading a corrupt trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadCorruptTrustStore() throws Exception {
+    File corruptStore = File.createTempFile("truststore-corrupt", "jks");
+    corruptStore.deleteOnExit();
+    OutputStream os = new FileOutputStream(corruptStore);
+    os.write(1);
+    os.close();
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        corruptStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });

Review Comment:
   super nit: we can do a `corruptStore.delete` at method (here and in other cases) for the typical case.



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  /**
+   * Tests to ensure that loading a missing trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadMissingTrustStore() throws Exception {
+    File trustStore = new File("testmissing.jks");
+    assertFalse(trustStore.exists());
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        trustStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * Tests to ensure that loading a corrupt trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadCorruptTrustStore() throws Exception {
+    File corruptStore = File.createTempFile("truststore-corrupt", "jks");
+    corruptStore.deleteOnExit();
+    OutputStream os = new FileOutputStream(corruptStore);
+    os.write(1);
+    os.close();
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        corruptStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test
+  public void testReload() throws Exception {
+    KeyPair kp = generateKeyPair("RSA");
+    X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
+    X509Certificate cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA");
+    File trustStore = File.createTempFile("testreload", "jks");
+    trustStore.deleteOnExit();
+    createTrustStore(trustStore, "password", "cert1", cert1);
+
+    ReloadingX509TrustManager tm =
+      new ReloadingX509TrustManager("jks", trustStore, "password", 10);
+    try {
+      tm.init();
+      assertEquals(1, tm.getAcceptedIssuers().length);
+
+      // Wait so that the file modification time is different
+      Thread.sleep((tm.getReloadInterval() + 1000));

Review Comment:
   `sleep` can be spuriously woken up - which can make this test flakey.
   Ensure it has actually slept for the requested time (See `SystemClock.waitTillTime` for an example).



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  /**
+   * Tests to ensure that loading a missing trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadMissingTrustStore() throws Exception {
+    File trustStore = new File("testmissing.jks");
+    assertFalse(trustStore.exists());
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        trustStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * Tests to ensure that loading a corrupt trust-store fails
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLoadCorruptTrustStore() throws Exception {
+    File corruptStore = File.createTempFile("truststore-corrupt", "jks");
+    corruptStore.deleteOnExit();
+    OutputStream os = new FileOutputStream(corruptStore);
+    os.write(1);
+    os.close();
+
+    assertThrows(IOException.class, () -> {
+      ReloadingX509TrustManager tm = new ReloadingX509TrustManager(
+        KeyStore.getDefaultType(),
+        corruptStore,
+        "password",
+        10
+      );
+      try {
+        tm.init();
+      } finally {
+        tm.destroy();
+      }
+    });
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test
+  public void testReload() throws Exception {

Review Comment:
   Comments on this test should apply to others below as well.



##########
common/network-common/src/main/java/org/apache/spark/network/ssl/ReloadingX509TrustManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link TrustManager} implementation that reloads its configuration when
+ * the truststore file on disk changes.
+ * This implementation is based entirely on the
+ * org.apache.hadoop.security.ssl.ReloadingX509TrustManager class in the Apache Hadoop Encrypted
+ * Shuffle implementation.
+ *
+ * @see <a href="https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Hadoop MapReduce Next Generation - Encrypted Shuffle</a>
+ */
+public final class ReloadingX509TrustManager
+        implements X509TrustManager, Runnable {
+
+  private final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
+
+  private String type;
+  private File file;
+  private String password;
+  private long lastLoaded;
+  private long reloadInterval;
+  private AtomicReference<X509TrustManager> trustManagerRef;
+
+  private volatile boolean running;
+  private Thread reloader;
+
+  /**
+   * Creates a reloadable trustmanager. The trustmanager reloads itself
+   * if the underlying trustore file has changed.
+   *
+   * @param type           type of truststore file, typically 'jks'.
+   * @param trustStore     the truststore file.
+   * @param password       password of the truststore file.
+   * @param reloadInterval interval to check if the truststore file has
+   *                       changed, in milliseconds.
+   * @throws IOException              thrown if the truststore could not be initialized due
+   *                                  to an IO error.
+   * @throws GeneralSecurityException thrown if the truststore could not be
+   *                                  initialized due to a security error.
+   */
+  public ReloadingX509TrustManager(
+      String type, File trustStore, String password, long reloadInterval)
+      throws IOException, GeneralSecurityException {
+    this.type = type;
+    file = trustStore;
+    this.password = password;
+    trustManagerRef = new AtomicReference<X509TrustManager>();
+    trustManagerRef.set(loadTrustManager());
+    this.reloadInterval = reloadInterval;
+  }
+
+  /**
+   * Starts the reloader thread.
+   */
+  public void init() {
+    reloader = new Thread(this, "Truststore reloader thread");
+    reloader.setDaemon(true);
+    running = true;
+    reloader.start();
+  }
+
+  /**
+   * Stops the reloader thread.
+   */
+  public void destroy() {
+    running = false;
+    reloader.interrupt();

Review Comment:
   `join` on the `reloader` thread after `interrupt`



##########
common/network-common/src/main/java/org/apache/spark/network/ssl/ReloadingX509TrustManager.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link TrustManager} implementation that reloads its configuration when
+ * the truststore file on disk changes.
+ * This implementation is based entirely on the
+ * org.apache.hadoop.security.ssl.ReloadingX509TrustManager class in the Apache Hadoop Encrypted
+ * Shuffle implementation.
+ *
+ * @see <a href="https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Hadoop MapReduce Next Generation - Encrypted Shuffle</a>
+ */
+public final class ReloadingX509TrustManager
+        implements X509TrustManager, Runnable {
+
+  private final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
+
+  private String type;
+  private File file;
+  private String password;
+  private long lastLoaded;
+  private long reloadInterval;
+  private AtomicReference<X509TrustManager> trustManagerRef;
+
+  private volatile boolean running;
+  private Thread reloader;
+
+  /**
+   * Creates a reloadable trustmanager. The trustmanager reloads itself
+   * if the underlying trustore file has changed.
+   *
+   * @param type           type of truststore file, typically 'jks'.
+   * @param trustStore     the truststore file.
+   * @param password       password of the truststore file.
+   * @param reloadInterval interval to check if the truststore file has
+   *                       changed, in milliseconds.
+   * @throws IOException              thrown if the truststore could not be initialized due
+   *                                  to an IO error.
+   * @throws GeneralSecurityException thrown if the truststore could not be
+   *                                  initialized due to a security error.
+   */
+  public ReloadingX509TrustManager(
+      String type, File trustStore, String password, long reloadInterval)
+      throws IOException, GeneralSecurityException {
+    this.type = type;
+    file = trustStore;
+    this.password = password;
+    trustManagerRef = new AtomicReference<X509TrustManager>();
+    trustManagerRef.set(loadTrustManager());
+    this.reloadInterval = reloadInterval;
+  }
+
+  /**
+   * Starts the reloader thread.
+   */
+  public void init() {
+    reloader = new Thread(this, "Truststore reloader thread");
+    reloader.setDaemon(true);
+    running = true;
+    reloader.start();
+  }
+
+  /**
+   * Stops the reloader thread.
+   */
+  public void destroy() {
+    running = false;
+    reloader.interrupt();
+  }
+
+  /**
+   * Returns the reload check interval.
+   *
+   * @return the reload check interval, in milliseconds.
+   */
+  public long getReloadInterval() {
+    return reloadInterval;
+  }
+
+  @Override
+  public void checkClientTrusted(X509Certificate[] chain, String authType)
+          throws CertificateException {
+    X509TrustManager tm = trustManagerRef.get();
+    if (tm != null) {
+      tm.checkClientTrusted(chain, authType);
+    } else {
+      throw new CertificateException("Unknown client chain certificate: " +
+        chain[0].toString());

Review Comment:
   The `else` condition here (and in `checkServerTrusted`) is a config issue due to an incorrect or invalid trust store specified - indicate that in the exception as well to help users debug the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43249:
URL: https://github.com/apache/spark/pull/43249#discussion_r1357178566


##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {

Review Comment:
   Remove empty javadoc, or add relevant details.



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  private final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManagerSuite.class);
+
+  /**
+   * Waits until reload count hits the requested value, sleeping 100ms at a time.
+   * If the maximum number of attempts is hit, throws a RuntimeException
+   * @param tm the trust manager to wait for
+   * @param count The count to wait for
+   * @param attempts The number of attempts to wait for
+   */
+  private void waitForReloadCount(ReloadingX509TrustManager tm, int count, int attempts)

Review Comment:
   We are using 1 second for the tests, given the tests will terminate quicker in case the required condition is reached sooner - we should probably bump that to higher duration to minimize flakiness in case the nodes are loaded.
   
   Unfortunately, our values are all over the place in tests - 3, 5, 10, etc .... I would suggest using 5s and ensure we dont need to revisit this unless it is an actual failure and not flakiness.



##########
common/network-common/src/test/java/org/apache/spark/network/ssl/ReloadingX509TrustManagerSuite.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import static org.apache.spark.network.ssl.SslSampleConfigs.*;
+
+/**
+ *
+ */
+public class ReloadingX509TrustManagerSuite {
+
+  private final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManagerSuite.class);
+
+  /**
+   * Waits until reload count hits the requested value, sleeping 100ms at a time.
+   * If the maximum number of attempts is hit, throws a RuntimeException
+   * @param tm the trust manager to wait for
+   * @param count The count to wait for
+   * @param attempts The number of attempts to wait for
+   */
+  private void waitForReloadCount(ReloadingX509TrustManager tm, int count, int attempts)
+          throws InterruptedException {
+    if (tm.reloadCount > count) {
+      throw new RuntimeException(
+        "Passed invalid count " + count + " to waitForReloadCount, already have " + tm.reloadCount);
+    }
+    for (int i = 0; i < attempts; i++) {
+      if (tm.reloadCount >= count) {
+        return;
+      }
+      // Adapted from SystemClock.waitTillTime
+      long startTime = System.currentTimeMillis();
+      long targetTime = startTime + 100;
+      long currentTime = startTime;
+      while (currentTime < targetTime) {
+        long sleepTime = Math.min(10, targetTime - currentTime);
+        Thread.sleep(sleepTime);
+        currentTime = System.currentTimeMillis();
+      }
+    }
+    throw new RuntimeException("Trust store not reloaded after " + attempts + " attempts!");

Review Comment:
   nit: `RuntimeException` -> `IllegalStateException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "hasnain-db (via GitHub)" <gi...@apache.org>.
hasnain-db commented on PR #43249:
URL: https://github.com/apache/spark/pull/43249#issuecomment-1760701259

   @mridulm done - I can re-request review once tests pass (might take a try or 2)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45426][CORE] Add support for a ReloadingX509TrustManager [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43249:
URL: https://github.com/apache/spark/pull/43249#issuecomment-1761811698

   The test failure is unrelated.
   Merged to master.
   Thanks for working on this @hasnain-db !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org