You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/23 20:21:41 UTC
[pulsar] branch master updated: [pulsar-broker] Perform auto cert
refresh for Pulsar-admin (#8831)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 98bf97e [pulsar-broker] Perform auto cert refresh for Pulsar-admin (#8831)
98bf97e is described below
commit 98bf97e0765db30022597dd468afcc5227b417e8
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Dec 23 12:21:16 2020 -0800
[pulsar-broker] Perform auto cert refresh for Pulsar-admin (#8831)
### Motivation
We are frequently getting 500 on `pulsar-admin topics list <ns>` cli command. It happens because `pulsar-admin topics` rest-api internally uses `pulsar-admin` to get list of non-persistent topics. `PulsarAdmin-HttpClient` crates persistent connection but it doesn't perform auto-cert refresh so, if cert is expired and reconnection happens then broker always gets 500 when it uses `pulsar-admin` internally due to invalid certs.
```
21:09:16.025 [AsyncHttpClient-48-9] ERROR org.apache.pulsar.broker.admin.v1.NonPersistentTopics - [role] Failed to get list of topics under namespace prop/cluster/ns
java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException: java.net.ConnectException: error:10000416:SSL routines:OPENSSL_internal:SSLV3_ALERT_CERTIFICATE_UNKNOWN
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at org.apache.pulsar.broker.admin.v1.NonPersistentTopics.lambda$getList$0(NonPersistentTopics.java:211) ~[pulsar-broker.jar:]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl$4.failed(NonPersistentTopicsImpl.java:215) ~[pulsar-client-admin-original.jar:]
at org.glassfish.jersey.client.JerseyInvocation$4.failed(JerseyInvocation.java:1030) ~[jersey-client-2.27.jar:?]
at org.glassfish.jersey.client.ClientRuntime.processFailure(ClientRuntime.java:231) ~[jersey-client-2.27.jar:?]
at org.glassfish.jersey.client.ClientRuntime.access$100(ClientRuntime.java:85) ~[jersey-client-2.27.jar:?]
at org.glassfish.jersey.client.ClientRuntime$2.lambda$failure$1(ClientRuntime.java:183) ~[jersey-client-2.27.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272) [jersey-common-2.27.jar:?]
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268) [jersey-common-2.27.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:316) [jersey-common-2.27.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:298) [jersey-common-2.27.jar:?]
at org.glassfish.jersey.internal.Errors.process(Errors.java:268) [jersey-common-2.27.jar:?]
```
### Modification
Add Capability in HttpClient to perform auto-cert refresh to avoid any tls handshake failure.
---
.../pulsar/broker/admin/AdminApiTlsAuthTest.java | 60 ++++++++
.../apache/pulsar/client/admin/PulsarAdmin.java | 14 +-
.../pulsar/client/admin/PulsarAdminBuilder.java | 7 +
.../admin/internal/PulsarAdminBuilderImpl.java | 15 +-
.../admin/internal/http/AsyncHttpConnector.java | 12 +-
.../internal/http/AsyncHttpConnectorProvider.java | 12 +-
.../client/api/AuthenticationDataProvider.java | 15 ++
.../client/impl/auth/AuthenticationDataTls.java | 10 ++
.../common/util/FileModifiedTimeUpdater.java | 3 +
.../apache/pulsar/common/util/KeyManagerProxy.java | 152 +++++++++++++++++++++
.../apache/pulsar/common/util/SecurityUtility.java | 34 +++++
.../pulsar/common/util/TrustManagerProxy.java | 129 +++++++++++++++++
12 files changed, 446 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
index b623725..1cc110f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java
@@ -20,9 +20,15 @@ package org.apache.pulsar.broker.admin;
import com.google.common.collect.ImmutableSet;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
@@ -34,6 +40,7 @@ import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -371,4 +378,57 @@ public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest {
admin.namespaces().deleteNamespace("tenant1/ns1");
}
}
+
+ /**
+ * Validates Pulsar-admin performs auto cert refresh.
+ * @throws Exception
+ */
+ @Test
+ public void testCertRefreshForPulsarAdmin() throws Exception {
+ String adminUser = "admin";
+ String user2 = "user1";
+ File keyFile = new File(getTLSFile("temp" + ".key-pk8"));
+ Path keyFilePath = Paths.get(keyFile.getAbsolutePath());
+ int autoCertRefreshTimeSec = 1;
+ try {
+ Files.copy(Paths.get(getTLSFile(user2 + ".key-pk8")), keyFilePath, StandardCopyOption.REPLACE_EXISTING);
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .allowTlsInsecureConnection(false)
+ .enableTlsHostnameVerification(false)
+ .serviceHttpUrl(brokerUrlTls.toString())
+ .autoCertRefreshTime(1, TimeUnit.SECONDS)
+ .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
+ String.format("tlsCertFile:%s,tlsKeyFile:%s",
+ getTLSFile(adminUser + ".cert"), keyFile))
+ .tlsTrustCertsFilePath(getTLSFile("ca.cert")).build();
+ // try to call admin-api which should fail due to incorrect key-cert
+ try {
+ admin.tenants().createTenant("tenantX",
+ new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test")));
+ Assert.fail("should have failed due to invalid key file");
+ } catch (Exception e) {
+ //OK
+ }
+ // replace correct key file
+ Files.delete(keyFile.toPath());
+ Thread.sleep(2 * autoCertRefreshTimeSec * 1000);
+ Files.copy(Paths.get(getTLSFile(adminUser + ".key-pk8")), keyFilePath);
+ MutableBoolean success = new MutableBoolean(false);
+ retryStrategically((test) -> {
+ try {
+ admin.tenants().createTenant("tenantX",
+ new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test")));
+ success.setValue(true);
+ return true;
+ }catch(Exception e) {
+ return false;
+ }
+ }, 5, 1000);
+ Assert.assertTrue(success.booleanValue());
+ Assert.assertEquals(ImmutableSet.of("tenantX"), admin.tenants().getTenants());
+ admin.close();
+ }finally {
+ Files.delete(keyFile.toPath());
+ }
+ }
}
\ No newline at end of file
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index f202397..7695e32 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -73,6 +73,7 @@ public class PulsarAdmin implements Closeable {
public static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 60;
public static final int DEFAULT_READ_TIMEOUT_SECONDS = 60;
public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300;
+ public static final int DEFAULT_CERT_REFRESH_SECONDS = 300;
private final Clusters clusters;
private final Brokers brokers;
@@ -133,9 +134,8 @@ public class PulsarAdmin implements Closeable {
public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData) throws PulsarClientException {
this(serviceUrl, clientConfigData, DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS,
- DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS,
- DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, null);
-
+ DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS,
+ DEFAULT_CERT_REFRESH_SECONDS, TimeUnit.SECONDS, null);
}
public PulsarAdmin(String serviceUrl,
@@ -146,6 +146,8 @@ public class PulsarAdmin implements Closeable {
TimeUnit readTimeoutUnit,
int requestTimeout,
TimeUnit requestTimeoutUnit,
+ int autoCertRefreshTime,
+ TimeUnit autoCertRefreshTimeUnit,
ClassLoader clientBuilderClassLoader) throws PulsarClientException {
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
@@ -166,7 +168,8 @@ public class PulsarAdmin implements Closeable {
clientConfigData.setServiceUrl(serviceUrl);
}
- AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData);
+ AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
+ (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));
ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
@@ -200,7 +203,8 @@ public class PulsarAdmin implements Closeable {
this.asyncHttpConnector = asyncConnectorProvider.getConnector(
Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)),
Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
- Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)));
+ Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)),
+ (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));
long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index fc647e0..fda3694 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -252,6 +252,13 @@ public interface PulsarAdminBuilder {
PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit);
/**
+ * This sets auto cert refresh time if Pulsar admin uses tls authentication.
+ *
+ * @param autoCertRefreshTime
+ * @param autoCertRefreshTimeUnit
+ */
+ PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit);
+ /**
*
* @return
*/
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index b942cdb..75985c5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -35,16 +35,18 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
private int connectTimeout = PulsarAdmin.DEFAULT_CONNECT_TIMEOUT_SECONDS;
private int readTimeout = PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS;
private int requestTimeout = PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS;
+ private int autoCertRefreshTime = PulsarAdmin.DEFAULT_CERT_REFRESH_SECONDS;
private TimeUnit connectTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit readTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
+ private TimeUnit autoCertRefreshTimeUnit = TimeUnit.SECONDS;
private ClassLoader clientBuilderClassLoader = null;
@Override
public PulsarAdmin build() throws PulsarClientException {
- return new PulsarAdmin(conf.getServiceUrl(),
- conf, connectTimeout, connectTimeoutUnit, readTimeout, readTimeoutUnit,
- requestTimeout, requestTimeoutUnit, clientBuilderClassLoader);
+ return new PulsarAdmin(conf.getServiceUrl(), conf, connectTimeout, connectTimeoutUnit, readTimeout,
+ readTimeoutUnit, requestTimeout, requestTimeoutUnit, autoCertRefreshTime,
+ autoCertRefreshTimeUnit, clientBuilderClassLoader);
}
public PulsarAdminBuilderImpl() {
@@ -168,6 +170,13 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
}
@Override
+ public PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit) {
+ this.autoCertRefreshTime = autoCertRefreshTime;
+ this.autoCertRefreshTimeUnit = autoCertRefreshTimeUnit;
+ return this;
+ }
+
+ @Override
public PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader) {
this.clientBuilderClassLoader = clientBuilderClassLoader;
return this;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 1ff6e19..bc15f24 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -80,16 +80,18 @@ public class AsyncHttpConnector implements Connector {
private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
new DefaultThreadFactory("delayer"));
- public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
+ public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
(int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000,
+ autoCertRefreshTimeSeconds,
conf);
}
@SneakyThrows
public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
- int requestTimeoutMs, ClientConfigurationData conf) {
+ int requestTimeoutMs,
+ int autoCertRefreshTimeSeconds, ClientConfigurationData conf) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
@@ -136,10 +138,10 @@ public class AsyncHttpConnector implements Connector {
SslContext sslCtx = null;
if (authData.hasDataForTls()) {
sslCtx = authData.getTlsTrustStoreStream() == null
- ? SecurityUtility.createNettySslContextForClient(
+ ? SecurityUtility.createAutoRefreshSslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
- conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
- authData.getTlsPrivateKey())
+ conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(),
+ authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer)
: SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
index 33f2439..c73bdfd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -31,21 +31,25 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider {
private final ClientConfigurationData conf;
private Connector connector;
+ private final int autoCertRefreshTimeSeconds;
- public AsyncHttpConnectorProvider(ClientConfigurationData conf) {
+ public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
this.conf = conf;
+ this.autoCertRefreshTimeSeconds = autoCertRefreshTimeSeconds;
}
@Override
public Connector getConnector(Client client, Configuration runtimeConfig) {
if (connector == null) {
- connector = new AsyncHttpConnector(client, conf);
+ connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds);
}
return connector;
}
- public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs) {
- return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, conf);
+ public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs,
+ int autoCertRefreshTimeSeconds) {
+ return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, autoCertRefreshTimeSeconds,
+ conf);
}
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index 87adba3..4624821 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -58,6 +58,13 @@ public interface AuthenticationDataProvider extends Serializable {
}
/**
+ * @return a client certificate file path
+ */
+ default String getTlsCerificateFilePath() {
+ return null;
+ }
+
+ /**
*
* @return a private key for the client certificate, or null if the data are not available
*/
@@ -67,6 +74,14 @@ public interface AuthenticationDataProvider extends Serializable {
/**
*
+ * @return a private key file path
+ */
+ default String getTlsPrivateKeyFilePath() {
+ return null;
+ }
+
+ /**
+ *
* @return an input-stream of the trust store, or null if the trust-store provided at
* {@link ClientConfigurationData#getTlsTrustStorePath()}
*/
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index 588f614..3e9e903 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -129,5 +129,15 @@ public class AuthenticationDataTls implements AuthenticationDataProvider {
return trustStoreStreamProvider != null ? trustStoreStreamProvider.get() : null;
}
+ @Override
+ public String getTlsCerificateFilePath() {
+ return certFile != null ? certFile.getFileName() : null;
+ }
+
+ @Override
+ public String getTlsPrivateKeyFilePath() {
+ return keyFile != null ? keyFile.getFileName() : null;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
index d269624..99bdb8e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
@@ -24,12 +24,15 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import lombok.Getter;
+import lombok.ToString;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class working with file's modified time.
*/
+@ToString
public class FileModifiedTimeUpdater {
@Getter
String fileName;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
new file mode 100644
index 0000000..b7b5173
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.X509ExtendedKeyManager;
+
+import io.netty.handler.ssl.SslContext;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This class wraps {@link X509ExtendedKeyManager} and gives opportunity to refresh key-manager with refreshed certs
+ * without changing {@link SslContext}.
+ */
+@Slf4j
+public class KeyManagerProxy extends X509ExtendedKeyManager {
+
+ private static final char[] KEYSTORE_PASSWORD = "secret".toCharArray();
+ private volatile X509ExtendedKeyManager keyManager;
+ private FileModifiedTimeUpdater certFile, keyFile;
+
+ public KeyManagerProxy(String certFilePath, String keyFilePath, int refreshDurationSec,
+ ScheduledExecutorService executor) {
+ this.certFile = new FileModifiedTimeUpdater(certFilePath);
+ this.keyFile = new FileModifiedTimeUpdater(keyFilePath);
+ try {
+ updateKeyManager();
+ } catch (CertificateException e) {
+ log.warn("Failed to load cert {}", certFile, e);
+ throw new IllegalArgumentException(e);
+ } catch (KeyStoreException e) {
+ log.warn("Failed to load key {}", keyFile, e);
+ throw new IllegalArgumentException(e);
+ } catch (NoSuchAlgorithmException | UnrecoverableKeyException e) {
+ log.warn("Failed to update key Manager", e);
+ throw new IllegalArgumentException(e);
+ }
+ executor.scheduleWithFixedDelay(() -> updateKeyManagerSafely(), refreshDurationSec, refreshDurationSec,
+ TimeUnit.SECONDS);
+ }
+
+ public void updateKeyManagerSafely() {
+ try {
+ updateKeyManager();
+ } catch (Exception e) {
+ log.warn("Failed to update key Manager for {}, {}", certFile.getFileName(), keyFile.getFileName(), e);
+ }
+ }
+
+ public void updateKeyManager()
+ throws CertificateException, KeyStoreException, NoSuchAlgorithmException, UnrecoverableKeyException {
+ if (keyManager != null && !certFile.checkAndRefresh() && !keyFile.checkAndRefresh()) {
+ return;
+ }
+ log.info("refreshing key manager for {} {}", certFile.getFileName(), keyFile.getFileName());
+ X509Certificate certificate;
+ PrivateKey privateKey = null;
+ KeyStore keyStore;
+ try (InputStream publicCertStream = new FileInputStream(certFile.getFileName());
+ InputStream privateKeyStream = new FileInputStream(keyFile.getFileName())) {
+ final CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ certificate = (X509Certificate) cf.generateCertificate(publicCertStream);
+ keyStore = KeyStore.getInstance("JKS");
+ String alias = certificate.getSubjectX500Principal().getName();
+ privateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName());
+ keyStore.load(null);
+ keyStore.setKeyEntry(alias, privateKey, KEYSTORE_PASSWORD, new X509Certificate[] { certificate });
+ } catch (IOException | KeyManagementException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ final KeyManagerFactory keyManagerFactory = KeyManagerFactory
+ .getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, KEYSTORE_PASSWORD);
+ this.keyManager = (X509ExtendedKeyManager) keyManagerFactory.getKeyManagers()[0];
+ }
+
+ @Override
+ public String[] getClientAliases(String s, Principal[] principals) {
+ return keyManager.getClientAliases(s, principals);
+ }
+
+ @Override
+ public String chooseClientAlias(String[] strings, Principal[] principals, Socket socket) {
+ return keyManager.chooseClientAlias(strings, principals, socket);
+ }
+
+ @Override
+ public String[] getServerAliases(String s, Principal[] principals) {
+ return keyManager.getServerAliases(s, principals);
+ }
+
+ @Override
+ public String chooseServerAlias(String s, Principal[] principals, Socket socket) {
+ return keyManager.chooseServerAlias(s, principals, socket);
+ }
+
+ @Override
+ public X509Certificate[] getCertificateChain(String s) {
+ return keyManager.getCertificateChain(s);
+ }
+
+ @Override
+ public PrivateKey getPrivateKey(String s) {
+ return keyManager.getPrivateKey(s);
+ }
+
+ @Override
+ public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) {
+ return keyManager.chooseEngineClientAlias(keyType, issuers, engine);
+ }
+
+ @Override
+ public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) {
+ return keyManager.chooseEngineServerAlias(keyType, issuers, engine);
+ }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 5d0b896..8438f13 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -49,6 +49,8 @@ import java.security.spec.PKCS8EncodedKeySpec;
import java.util.Base64;
import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
@@ -72,6 +74,7 @@ public class SecurityUtility {
// also used to get Factories. e.g. CertificateFactory.getInstance("X.509", "BCFIPS")
public static final String BC_FIPS = "BCFIPS";
public static final String BC = "BC";
+ private static final String SSLCONTEXT_ALGORITHM = "TLSv1.2";
public static boolean isBCFIPS() {
return BC_PROVIDER.getClass().getCanonicalName().equals(BC_FIPS_PROVIDER_CLASS);
@@ -148,6 +151,37 @@ public class SecurityUtility {
return createSslContext(allowInsecureConnection, trustCertificates, certificates, privateKey);
}
+ /**
+ * Creates {@link SslContext} with capability to do auto-cert refresh.
+ * @param allowInsecureConnection
+ * @param trustCertsFilePath
+ * @param certFilePath
+ * @param keyFilePath
+ * @param sslContextAlgorithm
+ * @param refreshDurationSec
+ * @param executor
+ * @return
+ * @throws GeneralSecurityException
+ * @throws SSLException
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ public static SslContext createAutoRefreshSslContextForClient(boolean allowInsecureConnection,
+ String trustCertsFilePath, String certFilePath, String keyFilePath, String sslContextAlgorithm,
+ int refreshDurationSec, ScheduledExecutorService executor)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+ KeyManagerProxy keyManager = new KeyManagerProxy(certFilePath, keyFilePath, refreshDurationSec, executor);
+ SslContextBuilder sslContexBuilder = SslContextBuilder.forClient();
+ sslContexBuilder.keyManager(keyManager);
+ if (allowInsecureConnection) {
+ sslContexBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+ } else {
+ TrustManagerProxy trustManager = new TrustManagerProxy(trustCertsFilePath, refreshDurationSec, executor);
+ sslContexBuilder.trustManager(trustManager);
+ }
+ return sslContexBuilder.build();
+ }
+
public static SslContext createNettySslContextForClient(boolean allowInsecureConnection, String trustCertsFilePath,
String certFilePath, String keyFilePath)
throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
new file mode 100644
index 0000000..7edbbb4
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedTrustManager;
+
+import io.netty.handler.ssl.SslContext;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This class wraps {@link X509ExtendedTrustManager} and gives opportunity to refresh Trust-manager with refreshed certs
+ * without changing {@link SslContext}.
+ */
+@Slf4j
+public class TrustManagerProxy extends X509ExtendedTrustManager {
+
+ private volatile X509ExtendedTrustManager trustManager;
+ private FileModifiedTimeUpdater certFile;
+
+ public TrustManagerProxy(String caCertFile, int refreshDurationSec, ScheduledExecutorService executor) {
+ this.certFile = new FileModifiedTimeUpdater(caCertFile);
+ try {
+ updateTrustManager();
+ } catch (IOException | CertificateException e) {
+ log.warn("Failed to load cert {}, {}", certFile, e.getMessage());
+ throw new IllegalArgumentException(e);
+ } catch (NoSuchAlgorithmException | KeyStoreException e) {
+ log.warn("Failed to init trust-store", e);
+ throw new IllegalArgumentException(e);
+ }
+ executor.scheduleWithFixedDelay(() -> updateTrustManagerSafely(), refreshDurationSec, refreshDurationSec,
+ TimeUnit.SECONDS);
+ }
+
+ private void updateTrustManagerSafely() {
+ try {
+ updateTrustManager();
+ } catch (Exception e) {
+ log.warn("Failed to init trust-store {}", certFile.getFileName(), e);
+ }
+ }
+
+ private void updateTrustManager() throws CertificateException, KeyStoreException, NoSuchAlgorithmException,
+ FileNotFoundException, IOException {
+ CertificateFactory factory = CertificateFactory.getInstance("X.509");
+ try (InputStream inputStream = new FileInputStream(certFile.getFileName())) {
+ X509Certificate certificate = (X509Certificate) factory.generateCertificate(inputStream);
+ String alias = certificate.getSubjectX500Principal().getName();
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(null);
+ keyStore.setCertificateEntry(alias, certificate);
+ final TrustManagerFactory trustManagerFactory = TrustManagerFactory
+ .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(keyStore);
+ trustManager = (X509ExtendedTrustManager) trustManagerFactory.getTrustManagers()[0];
+ }
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ trustManager.checkClientTrusted(x509Certificates, s);
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ trustManager.checkServerTrusted(x509Certificates, s);
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return trustManager.getAcceptedIssuers();
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket)
+ throws CertificateException {
+ trustManager.checkClientTrusted(chain, authType, socket);
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine)
+ throws CertificateException {
+ trustManager.checkClientTrusted(chain, authType, engine);
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket)
+ throws CertificateException {
+ trustManager.checkServerTrusted(chain, authType, socket);
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine)
+ throws CertificateException {
+ trustManager.checkServerTrusted(chain, authType, engine);
+ }
+}