You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:23 UTC

[pulsar] 37/38: [pulsar-client] Add support to load tls certs/key dynamically from inputstream (#6760)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 77c7f1cf9197f135e3ceb1e596a682fead9b8a37
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Apr 21 17:05:34 2020 -0700

    [pulsar-client] Add support to load tls certs/key dynamically from inputstream (#6760)
    
    ### Motivation
    Right now, Pulsar-client provides tls authentication support and default TLS provider `AuthenticationTls` expects file path of cert and key files. However, there are usescases where it will be difficult for user-applications to store certs/key file locally for tls authentication.
    eg:
    1. Applications running on docker or K8s containers will not have certs at defined location and app uses KMS or various key-vault system whose API return streams of certs.
    2. Operationally hard to manage key rotation in containers
    3. Need to avoid storing key/trust store files on file system for stronger security
    
    Therefore, it's good to have mechanism in default `AuthenticationTls` provider to read certs from memory/stream without storing certs on file-system.
    
    ### Modification
    Add Stream support in `AuthenticationTls` to provide X509Certs and PrivateKey which also performs auto-refresh when stream changes in a given provider.
    ```
    AuthenticationTls auth = new AuthenticationTls(certStreamProvider, keyStreamProvider);
    ```
    It will be also address: #5241
    (cherry picked from commit 3b48df1577a43509e1fa9afb01243dd87ea8026e)
---
 .../pulsar/client/api/TlsProducerConsumerTest.java | 111 +++++++++++++++++++++
 .../client/impl/auth/AuthenticationDataTls.java    |  47 ++++++++-
 .../pulsar/client/impl/auth/AuthenticationTls.java |  16 ++-
 .../client/impl/conf/ClientConfigurationData.java  |   3 +
 .../apache/pulsar/common/util/SecurityUtility.java |  46 ++++++++-
 5 files changed, 214 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 82aa7d3..55bc4a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -18,14 +18,26 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import lombok.Cleanup;
+
 public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerTest.class);
 
@@ -124,4 +136,103 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
             Assert.fail("Should not fail since certs are sent.");
         }
     }
+
+    @Test(timeOut = 60000)
+    public void testTlsCertsFromDynamicStream() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        AtomicInteger index = new AtomicInteger(0);
+
+        ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
+        ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+
+        Supplier<ByteArrayInputStream> certProvider = () -> getStream(index, certStream);
+        Supplier<ByteArrayInputStream> keyProvider = () -> getStream(index, keyStream);
+        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+        clientBuilder.authentication(auth);
+        @Cleanup
+        PulsarClient pulsarClient = clientBuilder.build();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .subscribe();
+
+        // unload the topic so, new connection will be made and read the cert streams again
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        topicRef.close(false);
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+                .createAsync().get(30, TimeUnit.SECONDS);
+        for (int i = 0; i < 10; i++) {
+            producer.send(("test" + i).getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String exepctedMsg = "test" + i;
+            Assert.assertEquals(exepctedMsg.getBytes(), msg.getData());
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * It verifies that AuthenticationTls provides cert refresh functionality.
+     * 
+     * <pre>
+     *  a. Create Auth with invalid cert
+     *  b. Consumer fails with invalid tls certs
+     *  c. refresh cert in provider
+     *  d. Consumer successfully gets created
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        AtomicInteger certIndex = new AtomicInteger(1);
+        AtomicInteger keyIndex = new AtomicInteger(0);
+        ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
+        ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+        Supplier<ByteArrayInputStream> certProvider = () -> getStream(certIndex, certStream,
+                keyStream/* invalid cert file */);
+        Supplier<ByteArrayInputStream> keyProvider = () -> getStream(keyIndex, keyStream);
+        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+        clientBuilder.authentication(auth);
+        @Cleanup
+        PulsarClient pulsarClient = clientBuilder.build();
+        Consumer<byte[]> consumer = null;
+        try {
+            consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+                    .subscriptionName("my-subscriber-name").subscribe();
+            Assert.fail("should have failed due to invalid tls cert");
+        } catch (PulsarClientException e) {
+            // Ok..
+        }
+
+        certIndex.set(0);
+        consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("my-subscriber-name").subscribe();
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    private ByteArrayInputStream createByteInputStream(String filePath) throws IOException {
+        InputStream inStream = new FileInputStream(filePath);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(inStream, baos);
+        return new ByteArrayInputStream(baos.toByteArray());
+    }
+
+    private ByteArrayInputStream getStream(AtomicInteger index, ByteArrayInputStream... streams) {
+        return streams[index.intValue()];
+    } 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index e355672..0d3df12 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -18,11 +18,17 @@
  */
 package org.apache.pulsar.client.impl.auth;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.security.KeyManagementException;
 import java.security.PrivateKey;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
+import java.util.function.Supplier;
 
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
 import org.apache.pulsar.common.util.SecurityUtility;
@@ -32,7 +38,10 @@ import org.slf4j.LoggerFactory;
 public class AuthenticationDataTls implements AuthenticationDataProvider {
     protected X509Certificate[] tlsCertificates;
     protected PrivateKey tlsPrivateKey;
-    protected FileModifiedTimeUpdater certFile, keyFile;
+    private FileModifiedTimeUpdater certFile, keyFile;
+    // key and cert using stream
+    private InputStream certStream, keyStream;
+    private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
 
     public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
         if (certFilePath == null) {
@@ -47,6 +56,22 @@ public class AuthenticationDataTls implements AuthenticationDataProvider {
         this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFilePath);
     }
 
+    public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider,
+            Supplier<ByteArrayInputStream> keyStreamProvider) throws KeyManagementException {
+        if (certStreamProvider == null || certStreamProvider.get() == null) {
+            throw new IllegalArgumentException("certStream provider or stream must not be null");
+        }
+        if (keyStreamProvider == null || keyStreamProvider.get() == null) {
+            throw new IllegalArgumentException("keyStream provider or stream must not be null");
+        }
+        this.certStreamProvider = certStreamProvider;
+        this.keyStreamProvider = keyStreamProvider;
+        this.certStream = certStreamProvider.get();
+        this.keyStream = keyStreamProvider.get();
+        this.tlsCertificates = SecurityUtility.loadCertificatesFromPemStream(certStream);
+        this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemStream(keyStream);
+    }
+
     /*
      * TLS
      */
@@ -58,24 +83,40 @@ public class AuthenticationDataTls implements AuthenticationDataProvider {
 
     @Override
     public Certificate[] getTlsCertificates() {
-        if (this.certFile.checkAndRefresh()) {
+        if (certFile != null && certFile.checkAndRefresh()) {
             try {
                 this.tlsCertificates = SecurityUtility.loadCertificatesFromPemFile(certFile.getFileName());
             } catch (KeyManagementException e) {
                 LOG.error("Unable to refresh authData for cert {}: ", certFile.getFileName(), e);
             }
+        } else if (certStreamProvider != null && certStreamProvider.get() != null
+                && !certStreamProvider.get().equals(certStream)) {
+            try {
+                certStream = certStreamProvider.get();
+                tlsCertificates = SecurityUtility.loadCertificatesFromPemStream(certStream);
+            } catch (KeyManagementException e) {
+                LOG.error("Unable to refresh authData from cert stream ", e);
+            }
         }
         return this.tlsCertificates;
     }
 
     @Override
     public PrivateKey getTlsPrivateKey() {
-        if (this.keyFile.checkAndRefresh()) {
+        if (keyFile != null && keyFile.checkAndRefresh()) {
             try {
                 this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName());
             } catch (KeyManagementException e) {
                 LOG.error("Unable to refresh authData for cert {}: ", keyFile.getFileName(), e);
             }
+        } else if (keyStreamProvider != null && keyStreamProvider.get() != null
+                && !keyStreamProvider.get().equals(keyStream)) {
+            try {
+                keyStream = keyStreamProvider.get();
+                tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemStream(keyStream);
+            } catch (KeyManagementException e) {
+                LOG.error("Unable to refresh authData from key stream ", e);
+            }
         }
         return this.tlsPrivateKey;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index d75e491..22cd2f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.client.impl.auth;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -42,6 +45,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
 
     private String certFilePath;
     private String keyFilePath;
+    private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
 
     public AuthenticationTls() {
     }
@@ -51,6 +55,11 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
         this.keyFilePath = keyFilePath;
     }
 
+    public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider, Supplier<ByteArrayInputStream> keyStreamProvider) {
+        this.certStreamProvider = certStreamProvider;
+        this.keyStreamProvider = keyStreamProvider;
+    }
+
     @Override
     public void close() throws IOException {
         // noop
@@ -64,10 +73,15 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
     @Override
     public AuthenticationDataProvider getAuthData() throws PulsarClientException {
         try {
-            return new AuthenticationDataTls(certFilePath, keyFilePath);
+            if (certFilePath != null && keyFilePath != null) {
+                return new AuthenticationDataTls(certFilePath, keyFilePath);
+            } else if (certStreamProvider != null && keyStreamProvider != null) {
+                return new AuthenticationDataTls(certStreamProvider, keyStreamProvider);
+            }
         } catch (Exception e) {
             throw new PulsarClientException(e);
         }
+        throw new IllegalArgumentException("cert/key file path or cert/key stream must be present");
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index af478ce..af6ad8d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -80,6 +80,9 @@ public class ClientConfigurationData implements Serializable, Cloneable {
         return authentication;
     }
 
+    public void setAuthentication(Authentication authentication) {
+        this.authentication = authentication;
+    }
     public boolean isUseTls() {
         if (useTls)
             return true;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 648e9f2..8d1af4a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -26,8 +26,9 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.security.GeneralSecurityException;
 import java.security.KeyFactory;
 import java.security.KeyManagementException;
@@ -40,6 +41,7 @@ import java.security.SecureRandom;
 import java.security.Security;
 import java.security.UnrecoverableKeyException;
 import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
 import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
 import java.security.spec.KeySpec;
@@ -242,9 +244,7 @@ public class SecurityUtility {
         }
 
         try (FileInputStream input = new FileInputStream(certFilePath)) {
-            CertificateFactory cf = CertificateFactory.getInstance("X.509");
-            Collection<X509Certificate> collection = (Collection<X509Certificate>) cf.generateCertificates(input);
-            certificates = collection.toArray(new X509Certificate[collection.size()]);
+            certificates = loadCertificatesFromPemStream(input);
         } catch (GeneralSecurityException | IOException e) {
             throw new KeyManagementException("Certificate loading error", e);
         }
@@ -252,6 +252,23 @@ public class SecurityUtility {
         return certificates;
     }
 
+    public static X509Certificate[] loadCertificatesFromPemStream(InputStream inStream) throws KeyManagementException  {
+        if (inStream == null) {
+            return null;
+        }
+        CertificateFactory cf;
+        try {
+            if (inStream.markSupported()) {
+                inStream.reset();
+            }
+            cf = CertificateFactory.getInstance("X.509");
+            Collection<X509Certificate> collection = (Collection<X509Certificate>) cf.generateCertificates(inStream);
+            return collection.toArray(new X509Certificate[collection.size()]);
+        } catch (CertificateException | IOException e) {
+            throw new KeyManagementException("Certificate loading error", e);
+        }
+    }
+
     public static PrivateKey loadPrivateKeyFromPemFile(String keyFilePath) throws KeyManagementException {
         PrivateKey privateKey = null;
 
@@ -259,7 +276,26 @@ public class SecurityUtility {
             return privateKey;
         }
 
-        try (BufferedReader reader = new BufferedReader(new FileReader(keyFilePath))) {
+        try (FileInputStream input = new FileInputStream(keyFilePath)) {
+            privateKey = loadPrivateKeyFromPemStream(input);
+        } catch (IOException e) {
+            throw new KeyManagementException("Private key loading error", e);
+        }
+
+        return privateKey;
+    }
+
+    public static PrivateKey loadPrivateKeyFromPemStream(InputStream inStream) throws KeyManagementException {
+        PrivateKey privateKey = null;
+
+        if (inStream == null) {
+            return privateKey;
+        }
+
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(inStream))) {
+            if (inStream.markSupported()) {
+                inStream.reset();
+            }
             StringBuilder sb = new StringBuilder();
             String previousLine = "";
             String currentLine = null;