You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:23 UTC
[pulsar] 37/38: [pulsar-client] Add support to load tls certs/key
dynamically from inputstream (#6760)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 77c7f1cf9197f135e3ceb1e596a682fead9b8a37
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 21 17:05:34 2020 -0700
[pulsar-client] Add support to load tls certs/key dynamically from inputstream (#6760)
### Motivation
Right now, Pulsar-client provides tls authentication support and default TLS provider `AuthenticationTls` expects file path of cert and key files. However, there are usescases where it will be difficult for user-applications to store certs/key file locally for tls authentication.
eg:
1. Applications running on docker or K8s containers will not have certs at defined location and app uses KMS or various key-vault system whose API return streams of certs.
2. Operationally hard to manage key rotation in containers
3. Need to avoid storing key/trust store files on file system for stronger security
Therefore, it's good to have mechanism in default `AuthenticationTls` provider to read certs from memory/stream without storing certs on file-system.
### Modification
Add Stream support in `AuthenticationTls` to provide X509Certs and PrivateKey which also performs auto-refresh when stream changes in a given provider.
```
AuthenticationTls auth = new AuthenticationTls(certStreamProvider, keyStreamProvider);
```
It will be also address: #5241
(cherry picked from commit 3b48df1577a43509e1fa9afb01243dd87ea8026e)
---
.../pulsar/client/api/TlsProducerConsumerTest.java | 111 +++++++++++++++++++++
.../client/impl/auth/AuthenticationDataTls.java | 47 ++++++++-
.../pulsar/client/impl/auth/AuthenticationTls.java | 16 ++-
.../client/impl/conf/ClientConfigurationData.java | 3 +
.../apache/pulsar/common/util/SecurityUtility.java | 46 ++++++++-
5 files changed, 214 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 82aa7d3..55bc4a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -18,14 +18,26 @@
*/
package org.apache.pulsar.client.api;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
+import lombok.Cleanup;
+
public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerTest.class);
@@ -124,4 +136,103 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
Assert.fail("Should not fail since certs are sent.");
}
}
+
+ @Test(timeOut = 60000)
+ public void testTlsCertsFromDynamicStream() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ String topicName = "persistent://my-property/use/my-ns/my-topic1";
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
+ .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS);
+ AtomicInteger index = new AtomicInteger(0);
+
+ ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
+ ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+
+ Supplier<ByteArrayInputStream> certProvider = () -> getStream(index, certStream);
+ Supplier<ByteArrayInputStream> keyProvider = () -> getStream(index, keyStream);
+ AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+ clientBuilder.authentication(auth);
+ @Cleanup
+ PulsarClient pulsarClient = clientBuilder.build();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+ .subscribe();
+
+ // unload the topic so, new connection will be made and read the cert streams again
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ topicRef.close(false);
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+ .createAsync().get(30, TimeUnit.SECONDS);
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test" + i).getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String exepctedMsg = "test" + i;
+ Assert.assertEquals(exepctedMsg.getBytes(), msg.getData());
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ /**
+ * It verifies that AuthenticationTls provides cert refresh functionality.
+ *
+ * <pre>
+ * a. Create Auth with invalid cert
+ * b. Consumer fails with invalid tls certs
+ * c. refresh cert in provider
+ * d. Consumer successfully gets created
+ * </pre>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
+ .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS);
+ AtomicInteger certIndex = new AtomicInteger(1);
+ AtomicInteger keyIndex = new AtomicInteger(0);
+ ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
+ ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+ Supplier<ByteArrayInputStream> certProvider = () -> getStream(certIndex, certStream,
+ keyStream/* invalid cert file */);
+ Supplier<ByteArrayInputStream> keyProvider = () -> getStream(keyIndex, keyStream);
+ AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+ clientBuilder.authentication(auth);
+ @Cleanup
+ PulsarClient pulsarClient = clientBuilder.build();
+ Consumer<byte[]> consumer = null;
+ try {
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name").subscribe();
+ Assert.fail("should have failed due to invalid tls cert");
+ } catch (PulsarClientException e) {
+ // Ok..
+ }
+
+ certIndex.set(0);
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name").subscribe();
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ private ByteArrayInputStream createByteInputStream(String filePath) throws IOException {
+ InputStream inStream = new FileInputStream(filePath);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(inStream, baos);
+ return new ByteArrayInputStream(baos.toByteArray());
+ }
+
+ private ByteArrayInputStream getStream(AtomicInteger index, ByteArrayInputStream... streams) {
+ return streams[index.intValue()];
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index e355672..0d3df12 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -18,11 +18,17 @@
*/
package org.apache.pulsar.client.impl.auth;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
+import java.util.function.Supplier;
+import org.apache.commons.compress.utils.IOUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
import org.apache.pulsar.common.util.SecurityUtility;
@@ -32,7 +38,10 @@ import org.slf4j.LoggerFactory;
public class AuthenticationDataTls implements AuthenticationDataProvider {
protected X509Certificate[] tlsCertificates;
protected PrivateKey tlsPrivateKey;
- protected FileModifiedTimeUpdater certFile, keyFile;
+ private FileModifiedTimeUpdater certFile, keyFile;
+ // key and cert using stream
+ private InputStream certStream, keyStream;
+ private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
if (certFilePath == null) {
@@ -47,6 +56,22 @@ public class AuthenticationDataTls implements AuthenticationDataProvider {
this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFilePath);
}
+ public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider,
+ Supplier<ByteArrayInputStream> keyStreamProvider) throws KeyManagementException {
+ if (certStreamProvider == null || certStreamProvider.get() == null) {
+ throw new IllegalArgumentException("certStream provider or stream must not be null");
+ }
+ if (keyStreamProvider == null || keyStreamProvider.get() == null) {
+ throw new IllegalArgumentException("keyStream provider or stream must not be null");
+ }
+ this.certStreamProvider = certStreamProvider;
+ this.keyStreamProvider = keyStreamProvider;
+ this.certStream = certStreamProvider.get();
+ this.keyStream = keyStreamProvider.get();
+ this.tlsCertificates = SecurityUtility.loadCertificatesFromPemStream(certStream);
+ this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemStream(keyStream);
+ }
+
/*
* TLS
*/
@@ -58,24 +83,40 @@ public class AuthenticationDataTls implements AuthenticationDataProvider {
@Override
public Certificate[] getTlsCertificates() {
- if (this.certFile.checkAndRefresh()) {
+ if (certFile != null && certFile.checkAndRefresh()) {
try {
this.tlsCertificates = SecurityUtility.loadCertificatesFromPemFile(certFile.getFileName());
} catch (KeyManagementException e) {
LOG.error("Unable to refresh authData for cert {}: ", certFile.getFileName(), e);
}
+ } else if (certStreamProvider != null && certStreamProvider.get() != null
+ && !certStreamProvider.get().equals(certStream)) {
+ try {
+ certStream = certStreamProvider.get();
+ tlsCertificates = SecurityUtility.loadCertificatesFromPemStream(certStream);
+ } catch (KeyManagementException e) {
+ LOG.error("Unable to refresh authData from cert stream ", e);
+ }
}
return this.tlsCertificates;
}
@Override
public PrivateKey getTlsPrivateKey() {
- if (this.keyFile.checkAndRefresh()) {
+ if (keyFile != null && keyFile.checkAndRefresh()) {
try {
this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName());
} catch (KeyManagementException e) {
LOG.error("Unable to refresh authData for cert {}: ", keyFile.getFileName(), e);
}
+ } else if (keyStreamProvider != null && keyStreamProvider.get() != null
+ && !keyStreamProvider.get().equals(keyStream)) {
+ try {
+ keyStream = keyStreamProvider.get();
+ tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemStream(keyStream);
+ } catch (KeyManagementException e) {
+ LOG.error("Unable to refresh authData from key stream ", e);
+ }
}
return this.tlsPrivateKey;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index d75e491..22cd2f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.client.impl.auth;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -42,6 +45,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
private String certFilePath;
private String keyFilePath;
+ private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
public AuthenticationTls() {
}
@@ -51,6 +55,11 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
this.keyFilePath = keyFilePath;
}
+ public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider, Supplier<ByteArrayInputStream> keyStreamProvider) {
+ this.certStreamProvider = certStreamProvider;
+ this.keyStreamProvider = keyStreamProvider;
+ }
+
@Override
public void close() throws IOException {
// noop
@@ -64,10 +73,15 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
try {
- return new AuthenticationDataTls(certFilePath, keyFilePath);
+ if (certFilePath != null && keyFilePath != null) {
+ return new AuthenticationDataTls(certFilePath, keyFilePath);
+ } else if (certStreamProvider != null && keyStreamProvider != null) {
+ return new AuthenticationDataTls(certStreamProvider, keyStreamProvider);
+ }
} catch (Exception e) {
throw new PulsarClientException(e);
}
+ throw new IllegalArgumentException("cert/key file path or cert/key stream must be present");
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index af478ce..af6ad8d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -80,6 +80,9 @@ public class ClientConfigurationData implements Serializable, Cloneable {
return authentication;
}
+ public void setAuthentication(Authentication authentication) {
+ this.authentication = authentication;
+ }
public boolean isUseTls() {
if (useTls)
return true;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 648e9f2..8d1af4a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -26,8 +26,9 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
-import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.KeyManagementException;
@@ -40,6 +41,7 @@ import java.security.SecureRandom;
import java.security.Security;
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.security.spec.KeySpec;
@@ -242,9 +244,7 @@ public class SecurityUtility {
}
try (FileInputStream input = new FileInputStream(certFilePath)) {
- CertificateFactory cf = CertificateFactory.getInstance("X.509");
- Collection<X509Certificate> collection = (Collection<X509Certificate>) cf.generateCertificates(input);
- certificates = collection.toArray(new X509Certificate[collection.size()]);
+ certificates = loadCertificatesFromPemStream(input);
} catch (GeneralSecurityException | IOException e) {
throw new KeyManagementException("Certificate loading error", e);
}
@@ -252,6 +252,23 @@ public class SecurityUtility {
return certificates;
}
+ public static X509Certificate[] loadCertificatesFromPemStream(InputStream inStream) throws KeyManagementException {
+ if (inStream == null) {
+ return null;
+ }
+ CertificateFactory cf;
+ try {
+ if (inStream.markSupported()) {
+ inStream.reset();
+ }
+ cf = CertificateFactory.getInstance("X.509");
+ Collection<X509Certificate> collection = (Collection<X509Certificate>) cf.generateCertificates(inStream);
+ return collection.toArray(new X509Certificate[collection.size()]);
+ } catch (CertificateException | IOException e) {
+ throw new KeyManagementException("Certificate loading error", e);
+ }
+ }
+
public static PrivateKey loadPrivateKeyFromPemFile(String keyFilePath) throws KeyManagementException {
PrivateKey privateKey = null;
@@ -259,7 +276,26 @@ public class SecurityUtility {
return privateKey;
}
- try (BufferedReader reader = new BufferedReader(new FileReader(keyFilePath))) {
+ try (FileInputStream input = new FileInputStream(keyFilePath)) {
+ privateKey = loadPrivateKeyFromPemStream(input);
+ } catch (IOException e) {
+ throw new KeyManagementException("Private key loading error", e);
+ }
+
+ return privateKey;
+ }
+
+ public static PrivateKey loadPrivateKeyFromPemStream(InputStream inStream) throws KeyManagementException {
+ PrivateKey privateKey = null;
+
+ if (inStream == null) {
+ return privateKey;
+ }
+
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(inStream))) {
+ if (inStream.markSupported()) {
+ inStream.reset();
+ }
StringBuilder sb = new StringBuilder();
String previousLine = "";
String currentLine = null;