You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:24 UTC
[pulsar] 38/38: Add Tls with keystore type config support (#6853)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9e72dfb7f88f529ad264974fa277894926b4a6af
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri May 8 15:41:19 2020 +0800
Add Tls with keystore type config support (#6853)
Fixes #6640
Add Tls with keystore type config.
Add Tls with keystore type config.
- Unit test passed
(cherry picked from commit 367ce7829827e4b0853e1f5a50566192bb82bf54)
---
conf/broker.conf | 56 ++++
conf/client.conf | 11 +
conf/standalone.conf | 100 +++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 98 ++++++
.../authentication/AuthenticationDataHttps.java | 1 -
.../OneStageAuthenticationState.java | 2 +-
.../java/org/apache/pulsar/PulsarStandalone.java | 25 +-
.../org/apache/pulsar/broker/PulsarService.java | 30 +-
.../pulsar/broker/service/BrokerService.java | 20 +-
.../broker/service/PulsarChannelInitializer.java | 62 ++--
.../org/apache/pulsar/broker/web/WebService.java | 35 +-
.../pulsar/client/api/TlsProducerConsumerTest.java | 6 +-
.../client/impl/AdminApiKeyStoreTlsAuthTest.java | 229 +++++++++++++
.../KeyStoreTlsProducerConsumerTestWithAuth.java | 267 ++++++++++++++++
...KeyStoreTlsProducerConsumerTestWithoutAuth.java | 255 +++++++++++++++
.../apache/pulsar/client/impl/KeyStoreTlsTest.java | 80 +++++
.../authentication/keystoretls/broker.keystore.jks | Bin 0 -> 2767 bytes
.../keystoretls/broker.truststore.jks | Bin 0 -> 731 bytes
.../keystoretls/brokerKeyStorePW.txt | 1 +
.../keystoretls/brokerTrustStorePW.txt | 1 +
.../authentication/keystoretls/client.keystore.jks | Bin 0 -> 2767 bytes
.../keystoretls/client.truststore.jks | Bin 0 -> 731 bytes
.../keystoretls/clientKeyStorePW.txt | 1 +
.../keystoretls/clientTrustStorePW.txt | 1 +
.../pulsar/client/admin/PulsarAdminBuilder.java | 60 +++-
.../admin/internal/PulsarAdminBuilderImpl.java | 49 ++-
.../admin/internal/http/AsyncHttpConnector.java | 46 ++-
.../client/api/AuthenticationDataProvider.java | 9 +
.../apache/pulsar/client/api/ClientBuilder.java | 64 ++++
.../apache/pulsar/client/api/KeyStoreParams.java | 41 +--
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 22 +-
.../apache/pulsar/client/cli/PulsarClientTool.java | 18 ++
.../pulsar/client/impl/ClientBuilderImpl.java | 47 ++-
.../org/apache/pulsar/client/impl/HttpClient.java | 68 ++--
.../pulsar/client/impl/HttpLookupService.java | 3 +-
.../client/impl/PulsarChannelInitializer.java | 39 ++-
.../impl/auth/AuthenticationDataKeyStoreTls.java | 32 +-
.../impl/auth/AuthenticationKeyStoreTls.java | 136 ++++++++
.../pulsar/client/impl/auth/AuthenticationTls.java | 4 +-
.../client/impl/conf/ClientConfigurationData.java | 12 +
pulsar-common/pom.xml | 4 +
.../common/util/ClientSslContextRefresher.java | 67 ----
.../common/util/DefaultSslContextBuilder.java | 18 +-
.../util/NettyClientSslContextRefresher.java | 74 +++++
...lder.java => NettyServerSslContextBuilder.java} | 33 +-
.../common/util/SslContextAutoRefreshBuilder.java | 46 +--
.../util/keystoretls/KeyStoreSSLContext.java | 355 +++++++++++++++++++++
.../util/keystoretls/NetSslContextBuilder.java | 92 ++++++
.../NettySSLContextAutoRefreshBuilder.java | 144 +++++++++
.../keystoretls/SSLContextValidatorEngine.java | 176 ++++++++++
.../SslContextFactoryWithAutoRefresh.java | 63 ++++
.../common/util/keystoretls/package-info.java | 34 +-
.../src/test/resources/broker.keystore.jks | Bin 0 -> 2767 bytes
.../src/test/resources/broker.truststore.jks | Bin 0 -> 731 bytes
.../src/test/resources/brokerKeyStorePW.txt | 1 +
.../src/test/resources/brokerTrustStorePW.txt | 1 +
pulsar-common/src/test/resources/ca-cert | 16 +
pulsar-common/src/test/resources/ca-cert.srl | 1 +
pulsar-common/src/test/resources/ca-key | 30 ++
pulsar-common/src/test/resources/cert-file | 17 +
pulsar-common/src/test/resources/cert-signed | 22 ++
.../src/test/resources/client.keystore.jks | Bin 0 -> 2767 bytes
.../src/test/resources/client.truststore.jks | Bin 0 -> 731 bytes
.../src/test/resources/clientKeyStorePW.txt | 1 +
.../src/test/resources/clientTrustStorePW.txt | 1 +
.../src/test/resources/old/broker.keystore.jks | Bin 0 -> 2928 bytes
.../src/test/resources/old/broker.truststore.jks | Bin 0 -> 797 bytes
.../src/test/resources/old/brokerKeyStorePW.txt | 1 +
.../src/test/resources/old/brokerTrustStorePW.txt | 1 +
.../src/test/resources/old/client.keystore.jks | Bin 0 -> 2926 bytes
.../src/test/resources/old/client.truststore.jks | Bin 0 -> 797 bytes
.../src/test/resources/old/clientKeyStorePW.txt | 1 +
.../src/test/resources/old/clientTrustStorePW.txt | 1 +
.../service/ServiceChannelInitializer.java | 47 ++-
.../discovery/service/server/ServerManager.java | 35 +-
.../discovery/service/server/ServiceConfig.java | 222 ++-----------
.../pulsar/proxy/server/DirectProxyHandler.java | 14 +-
.../pulsar/proxy/server/ProxyConfiguration.java | 104 +++++-
.../pulsar/proxy/server/ProxyConnection.java | 21 +-
.../proxy/server/ServiceChannelInitializer.java | 90 +++++-
.../org/apache/pulsar/proxy/server/WebServer.java | 37 ++-
.../proxy/server/ProxyKeyStoreTlsTestWithAuth.java | 202 ++++++++++++
.../server/ProxyKeyStoreTlsTestWithoutAuth.java | 186 +++++++++++
.../authentication/keystoretls/broker.keystore.jks | Bin 0 -> 2767 bytes
.../keystoretls/broker.truststore.jks | Bin 0 -> 731 bytes
.../keystoretls/brokerKeyStorePW.txt | 1 +
.../keystoretls/brokerTrustStorePW.txt | 1 +
.../authentication/keystoretls/client.keystore.jks | Bin 0 -> 2767 bytes
.../keystoretls/client.truststore.jks | Bin 0 -> 731 bytes
.../keystoretls/clientKeyStorePW.txt | 1 +
.../keystoretls/clientTrustStorePW.txt | 1 +
site2/docs/reference-configuration.md | 12 +
92 files changed, 3566 insertions(+), 539 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index b1e6c1a..d0c942a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -389,6 +389,62 @@ tlsCiphers=
# authentication.
tlsRequireTrustedClientCertOnConnect=false
+### --- KeyStore TLS config variables --- ###
+# Enable TLS with KeyStore type configuration in broker.
+tlsEnabledWithKeyStore=false
+
+# TLS Provider for KeyStore type
+tlsProvider=
+
+# TLS KeyStore type configuration in broker: JKS, PKCS12
+tlsKeyStoreType=JKS
+
+# TLS KeyStore path in broker
+tlsKeyStore=
+
+# TLS KeyStore password for broker
+tlsKeyStorePassword=
+
+# TLS TrustStore type configuration in broker: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path in broker
+tlsTrustStore=
+
+# TLS TrustStore password in broker
+tlsTrustStorePassword=
+
+# Whether internal client use KeyStore type to authenticate with Pulsar brokers
+brokerClientTlsEnabledWithKeyStore=false
+
+# The TLS Provider used by internal client to authenticate with other Pulsar brokers
+brokerClientSslProvider=
+
+# TLS TrustStore type configuration for internal client: JKS, PKCS12
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStoreType=JKS
+
+# TLS TrustStore path for internal client
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStore=
+
+# TLS TrustStore password for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStorePassword=
+
+# Specify the tls cipher the internal client will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers)
+# e.g. [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsCiphers=
+
+# Specify the tls protocols the broker will use to negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# e.g. [TLSv1.2, TLSv1.1, TLSv1]
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsProtocols=
+
+
### --- Authentication --- ###
# Enable authentication
diff --git a/conf/client.conf b/conf/client.conf
index 887785a..597478e 100644
--- a/conf/client.conf
+++ b/conf/client.conf
@@ -56,3 +56,14 @@ tlsEnableHostnameVerification=false
# fails, then the cert is untrusted and the connection is dropped.
tlsTrustCertsFilePath=
+# Enable TLS with KeyStore type configuration in broker.
+useKeyStoreTls=false;
+
+# TLS KeyStore type configuration: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path
+tlsTrustStorePath=
+
+# TLS TrustStore password
+tlsTrustStorePassword=
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 7dff0c3..3d6c12e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -169,8 +169,8 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0
-# Dispatch rate-limiting relative to publish rate.
-# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
+# Dispatch rate-limiting relative to publish rate.
+# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
dispatchThrottlingRateRelativeToPublishRate=false
@@ -211,6 +211,102 @@ maxConsumersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0
+### --- TLS --- ###
+# Deprecated - Use webServicePortTls and brokerServicePortTls instead
+tlsEnabled=false
+
+# Tls cert refresh duration in seconds (set 0 to check on every new connection)
+tlsCertRefreshCheckDurationSec=300
+
+# Path for the TLS certificate file
+tlsCertificateFilePath=
+
+# Path for the TLS private key file
+tlsKeyFilePath=
+
+# Path for the trusted TLS certificate file.
+# This cert is used to verify that any certs presented by connecting clients
+# are signed by a certificate authority. If this verification
+# fails, then the certs are untrusted and the connections are dropped.
+tlsTrustCertsFilePath=
+
+# Accept untrusted TLS certificate from client.
+# If true, a client with a cert which cannot be verified with the
+# 'tlsTrustCertsFilePath' cert will allowed to connect to the server,
+# though the cert will not be used for client authentication.
+tlsAllowInsecureConnection=false
+
+# Specify the tls protocols the broker will use to negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# Examples:- [TLSv1.2, TLSv1.1, TLSv1]
+tlsProtocols=
+
+# Specify the tls cipher the broker will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers).
+# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+tlsCiphers=
+
+# Trusted client certificates are required for to connect TLS
+# Reject the Connection if the Client Certificate is not trusted.
+# In effect, this requires that all connecting clients perform TLS client
+# authentication.
+tlsRequireTrustedClientCertOnConnect=false
+
+### --- KeyStore TLS config variables --- ###
+# Enable TLS with KeyStore type configuration in broker.
+tlsEnabledWithKeyStore=false
+
+# TLS Provider for KeyStore type
+tlsProvider=
+
+# TLS KeyStore type configuration in broker: JKS, PKCS12
+tlsKeyStoreType=JKS
+
+# TLS KeyStore path in broker
+tlsKeyStore=
+
+# TLS KeyStore password for broker
+tlsKeyStorePassword=
+
+# TLS TrustStore type configuration in broker: JKS, PKCS12
+tlsTrustStoreType=JKS
+
+# TLS TrustStore path in broker
+tlsTrustStore=
+
+# TLS TrustStore password for broker
+tlsTrustStorePassword=
+
+# Whether internal client use KeyStore type to authenticate with Pulsar brokers
+brokerClientTlsEnabledWithKeyStore=false
+
+# The TLS Provider used by internal client to authenticate with other Pulsar brokers
+brokerClientSslProvider=
+
+# TLS TrustStore type configuration for internal client: JKS, PKCS12
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStoreType=JKS
+
+# TLS TrustStore path for internal client
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStore=
+
+# TLS TrustStore password for internal client,
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsTrustStorePassword=
+
+# Specify the tls cipher the internal client will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers)
+# e.g. [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsCiphers=
+
+# Specify the tls protocols the broker will use to negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# e.g. [TLSv1.2, TLSv1.1, TLSv1]
+# used by the internal client to authenticate with Pulsar brokers
+brokerClientTlsProtocols=
+
### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6b92061..68d56fe 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -77,6 +77,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
@Category
private static final String CATEGORY_TLS = "TLS";
@Category
+ private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS";
+ @Category
private static final String CATEGORY_AUTHENTICATION = "Authentication";
@Category
private static final String CATEGORY_AUTHORIZATION = "Authorization";
@@ -1458,6 +1460,102 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String transactionMetadataStoreProviderClassName =
"org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider";
+ /**** --- KeyStore TLS config variables --- ****/
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Enable TLS with KeyStore type configuration in broker"
+ )
+ private boolean tlsEnabledWithKeyStore = false;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS Provider for KeyStore type"
+ )
+ private String tlsProvider = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS KeyStore type configuration in broker: JKS, PKCS12"
+ )
+ private String tlsKeyStoreType = "JKS";
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS KeyStore path in broker"
+ )
+ private String tlsKeyStore = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS KeyStore password for broker"
+ )
+ private String tlsKeyStorePassword = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore type configuration in broker: JKS, PKCS12"
+ )
+ private String tlsTrustStoreType = "JKS";
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore path in broker"
+ )
+ private String tlsTrustStore = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore password for broker"
+ )
+ private String tlsTrustStorePassword = null;
+
+ /**** --- KeyStore TLS config variables used for internal client/admin to auth with other broker--- ****/
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Whether internal client use KeyStore type to authenticate with other Pulsar brokers"
+ )
+ private boolean brokerClientTlsEnabledWithKeyStore = false;
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "The TLS Provider used by internal client to authenticate with other Pulsar brokers"
+ )
+ private String brokerClientSslProvider = null;
+ // needed when client auth is required
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore type configuration for internal client: JKS, PKCS12 "
+ + " used by the internal client to authenticate with Pulsar brokers"
+ )
+ private String brokerClientTlsTrustStoreType = "JKS";
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore path for internal client, "
+ + " used by the internal client to authenticate with Pulsar brokers"
+ )
+ private String brokerClientTlsTrustStore = null;
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore password for internal client, "
+ + " used by the internal client to authenticate with Pulsar brokers"
+ )
+ private String brokerClientTlsTrustStorePassword = null;
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Specify the tls cipher the internal client will use to negotiate during TLS Handshake"
+ + " (a comma-separated list of ciphers).\n\n"
+ + "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].\n"
+ + " used by the internal client to authenticate with Pulsar brokers"
+ )
+ private Set<String> brokerClientTlsCiphers = Sets.newTreeSet();
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Specify the tls protocols the broker will use to negotiate during TLS handshake"
+ + " (a comma-separated list of protocol names).\n\n"
+ + "Examples:- [TLSv1.2, TLSv1.1, TLSv1] \n"
+ + " used by the internal client to authenticate with Pulsar brokers"
+ )
+ private Set<String> brokerClientTlsProtocols = Sets.newTreeSet();
+
/**
* @deprecated See {@link #getConfigurationStoreServers}
*/
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
index 03a9bd3..4e1d33b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
@@ -34,7 +34,6 @@ public class AuthenticationDataHttps extends AuthenticationDataHttp {
/*
* TLS
*/
-
@Override
public boolean hasDataFromTls() {
return (certificates != null);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
index 06b1749..f2667c3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java
@@ -42,7 +42,7 @@ public class OneStageAuthenticationState implements AuthenticationState {
SSLSession sslSession,
AuthenticationProvider provider) throws AuthenticationException {
this.authenticationDataSource = new AuthenticationDataCommand(
- new String(authData.getBytes(), UTF_8), remoteAddress, sslSession);;
+ new String(authData.getBytes(), UTF_8), remoteAddress, sslSession);
this.authRole = provider.authenticate(authenticationDataSource);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 7c76257..be1b276 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -332,11 +333,29 @@ public class PulsarStandalone implements AutoCloseable {
createSampleNameSpace(clusterData, cluster);
} else {
URL webServiceUrlTls = new URL(
- String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePortTls().get()));
+ String.format("https://%s:%d", config.getAdvertisedAddress(), config.getWebServicePortTls().get()));
String brokerServiceUrlTls = String.format("pulsar+ssl://%s:%d", config.getAdvertisedAddress(),
config.getBrokerServicePortTls().get());
- admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrlTls.toString()).authentication(
- config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build();
+ PulsarAdminBuilder builder = PulsarAdmin.builder()
+ .serviceHttpUrl(webServiceUrlTls.toString())
+ .authentication(
+ config.getBrokerClientAuthenticationPlugin(),
+ config.getBrokerClientAuthenticationParameters());
+
+ // set trust store if needed.
+ if (config.isBrokerClientTlsEnabled()) {
+ if (config.isBrokerClientTlsEnabledWithKeyStore()) {
+ builder.useKeyStoreTls(true)
+ .tlsTrustStoreType(config.getBrokerClientTlsTrustStoreType())
+ .tlsTrustStorePath(config.getBrokerClientTlsTrustStore())
+ .tlsTrustStorePassword(config.getBrokerClientTlsTrustStorePassword());
+ } else {
+ builder.tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath());
+ }
+ builder.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection());
+ }
+
+ admin = builder.build();
ClusterData clusterData = new ClusterData(null, webServiceUrlTls.toString(), null, brokerServiceUrlTls);
createSampleNameSpace(clusterData, cluster);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index eeb3b1a..62801ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -522,9 +522,9 @@ public class PulsarService implements AutoCloseable {
final String bootstrapMessage = "bootstrap service "
+ (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
- + (config.getWebServicePortTls().isPresent() ? "tls-port = " + config.getWebServicePortTls() : "")
- + (config.getBrokerServicePort().isPresent() ? "broker url= " + brokerServiceUrl : "")
- + (config.getBrokerServicePortTls().isPresent() ? "broker url= " + brokerServiceUrlTls : "");
+ + (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "")
+ + (config.getBrokerServicePort().isPresent() ? ", broker url= " + brokerServiceUrl : "")
+ + (config.getBrokerServicePortTls().isPresent() ? ", broker tls url= " + brokerServiceUrlTls : "");
LOG.info("messaging service is ready");
LOG.info("messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
@@ -935,10 +935,17 @@ public class PulsarService implements AutoCloseable {
.tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
if (this.getConfiguration().isBrokerClientTlsEnabled()) {
- builder.tlsTrustCertsFilePath(
- isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
- ? this.getConfiguration().getBrokerClientTrustCertsFilePath()
- : this.getConfiguration().getTlsCertificateFilePath());
+ if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
+ builder.useKeyStoreTls(true)
+ .tlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType())
+ .tlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore())
+ .tlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
+ } else {
+ builder.tlsTrustCertsFilePath(
+ isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath())
+ ? this.getConfiguration().getBrokerClientTrustCertsFilePath()
+ : this.getConfiguration().getTlsCertificateFilePath());
+ }
}
if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
@@ -964,7 +971,14 @@ public class PulsarService implements AutoCloseable {
conf.getBrokerClientAuthenticationParameters());
if (conf.isBrokerClientTlsEnabled()) {
- builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+ if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
+ builder.useKeyStoreTls(true)
+ .tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
+ .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
+ .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
+ } else {
+ builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+ }
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7a36d0b..ce040ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -786,8 +786,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls())
.enableTls(true)
- .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
+ if (pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
+ clientBuilder.useKeyStoreTls(true)
+ .tlsTrustStoreType(pulsar.getConfiguration().getBrokerClientTlsTrustStoreType())
+ .tlsTrustStorePath(pulsar.getConfiguration().getBrokerClientTlsTrustStore())
+ .tlsTrustStorePassword(pulsar.getConfiguration()
+ .getBrokerClientTlsTrustStorePassword());
+ } else {
+ clientBuilder.tlsTrustCertsFilePath(pulsar.getConfiguration()
+ .getBrokerClientTrustCertsFilePath());
+ }
} else {
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
@@ -823,8 +832,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
conf.getBrokerClientAuthenticationParameters());
if (isTlsUrl) {
- builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
+ if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
+ builder.useKeyStoreTls(true)
+ .tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType())
+ .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore())
+ .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
+ } else {
+ builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
+ }
}
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index ce16a7e..2a2d3d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -20,23 +20,24 @@ package org.apache.pulsar.broker.service;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-import java.net.SocketAddress;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.NettySslContextBuilder;
-
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
-
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flow.FlowControlHandler;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -45,8 +46,10 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
private final PulsarService pulsar;
private final boolean enableTls;
- private final NettySslContextBuilder sslCtxRefresher;
+ private final boolean tlsEnabledWithKeyStore;
+ private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
private final ServiceConfiguration brokerConf;
+ private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
// This cache is used to maintain a list of active connections to iterate over them
// We keep weak references to have the cache to be auto cleaned up when the connections
@@ -66,13 +69,31 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
super();
this.pulsar = pulsar;
this.enableTls = enableTLS;
+ ServiceConfiguration serviceConfig = pulsar.getConfiguration();
+ this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
if (this.enableTls) {
- ServiceConfiguration serviceConfig = pulsar.getConfiguration();
- sslCtxRefresher = new NettySslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
- serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
- serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
- serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
- serviceConfig.getTlsCertRefreshCheckDurationSec());
+ if (tlsEnabledWithKeyStore) {
+ nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+ serviceConfig.getTlsProvider(),
+ serviceConfig.getTlsKeyStoreType(),
+ serviceConfig.getTlsKeyStore(),
+ serviceConfig.getTlsKeyStorePassword(),
+ serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getTlsTrustStoreType(),
+ serviceConfig.getTlsTrustStore(),
+ serviceConfig.getTlsTrustStorePassword(),
+ serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+ serviceConfig.getTlsCiphers(),
+ serviceConfig.getTlsProtocols(),
+ serviceConfig.getTlsCertRefreshCheckDurationSec());
+ } else {
+ sslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
+ serviceConfig.getTlsKeyFilePath(),
+ serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+ serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+ serviceConfig.getTlsCertRefreshCheckDurationSec());
+ }
} else {
this.sslCtxRefresher = null;
}
@@ -86,7 +107,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (this.enableTls) {
- ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
+ if (this.tlsEnabledWithKeyStore) {
+ ch.pipeline().addLast(TLS_HANDLER,
+ new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
+ } else {
+ ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
+ }
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index ffaec7c..1a41ce1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.web;
import com.google.common.collect.Lists;
-
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
@@ -28,12 +27,13 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
-
import javax.servlet.DispatcherType;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -97,13 +97,30 @@ public class WebService implements AutoCloseable {
Optional<Integer> tlsPort = pulsar.getConfiguration().getWebServicePortTls();
if (tlsPort.isPresent()) {
try {
- SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
- pulsar.getConfiguration().isTlsAllowInsecureConnection(),
- pulsar.getConfiguration().getTlsTrustCertsFilePath(),
- pulsar.getConfiguration().getTlsCertificateFilePath(),
- pulsar.getConfiguration().getTlsKeyFilePath(),
- pulsar.getConfiguration().isTlsRequireTrustedClientCertOnConnect(), true,
- pulsar.getConfiguration().getTlsCertRefreshCheckDurationSec());
+ SslContextFactory sslCtxFactory;
+ ServiceConfiguration config = pulsar.getConfiguration();
+ if (config.isTlsEnabledWithKeyStore()) {
+ sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+ config.getTlsProvider(),
+ config.getTlsKeyStoreType(),
+ config.getTlsKeyStore(),
+ config.getTlsKeyStorePassword(),
+ config.isTlsAllowInsecureConnection(),
+ config.getTlsTrustStoreType(),
+ config.getTlsTrustStore(),
+ config.getTlsTrustStorePassword(),
+ config.isTlsRequireTrustedClientCertOnConnect(),
+ config.getTlsCertRefreshCheckDurationSec()
+ );
+ } else {
+ sslCtxFactory = SecurityUtility.createSslContextFactory(
+ config.isTlsAllowInsecureConnection(),
+ config.getTlsTrustCertsFilePath(),
+ config.getTlsCertificateFilePath(),
+ config.getTlsKeyFilePath(),
+ config.isTlsRequireTrustedClientCertOnConnect(), true,
+ config.getTlsCertRefreshCheckDurationSec());
+ }
httpsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 55bc4a7..9f1eac8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -182,14 +182,14 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
/**
* It verifies that AuthenticationTls provides cert refresh functionality.
- *
+ *
* <pre>
* a. Create Auth with invalid cert
* b. Consumer fails with invalid tls certs
* c. refresh cert in provider
* d. Consumer successfully gets created
* </pre>
- *
+ *
* @throws Exception
*/
@Test
@@ -234,5 +234,5 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
private ByteArrayInputStream getStream(AtomicInteger index, ByteArrayInputStream... streams) {
return streams[index.intValue()];
- }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
new file mode 100644
index 0000000..ab2833d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AdminApiKeyStoreTlsAuthTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls.mapToString;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.jackson.JacksonFeature;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AdminApiKeyStoreTlsAuthTest extends ProducerConsumerBase {
+ protected final String BROKER_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+ protected final String BROKER_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+ protected final String BROKER_KEYSTORE_PW = "111111";
+ protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+ protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+ protected final String CLIENT_KEYSTORE_PW = "111111";
+ protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_CN = "clientuser";
+ protected final String KEYSTORE_TYPE = "JKS";
+
+ private final String clusterName = "test";
+ Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ conf.setLoadBalancerEnabled(true);
+ conf.setBrokerServicePortTls(Optional.of(0));
+ conf.setWebServicePortTls(Optional.of(0));
+
+ conf.setTlsEnabledWithKeyStore(true);
+ conf.setTlsKeyStoreType(KEYSTORE_TYPE);
+ conf.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+ conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+
+ conf.setTlsTrustStoreType(KEYSTORE_TYPE);
+ conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+ conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+ conf.setClusterName(clusterName);
+ conf.setTlsRequireTrustedClientCertOnConnect(true);
+ tlsProtocols.add("TLSv1.2");
+ conf.setTlsProtocols(tlsProtocols);
+
+ // config for authentication and authorization.
+ conf.setSuperUserRoles(Sets.newHashSet(CLIENT_KEYSTORE_CN));
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderTls.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setBrokerClientTlsEnabled(true);
+ conf.setBrokerClientTlsEnabledWithKeyStore(true);
+
+ // set broker client tls auth
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
+ conf.setBrokerClientAuthenticationParameters(mapToString(authParams));
+ conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
+ conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
+
+ super.init();
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ WebTarget buildWebClient() throws Exception {
+ ClientConfig httpConfig = new ClientConfig();
+ httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
+ httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
+ httpConfig.register(MultiPartFeature.class);
+
+ ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig)
+ .register(JacksonConfigurator.class).register(JacksonFeature.class);
+
+ SSLContext sslCtx = KeyStoreSSLContext.createClientSslContext(
+ KEYSTORE_TYPE,
+ CLIENT_KEYSTORE_FILE_PATH,
+ CLIENT_KEYSTORE_PW,
+ KEYSTORE_TYPE,
+ BROKER_TRUSTSTORE_FILE_PATH,
+ BROKER_TRUSTSTORE_PW);
+
+ clientBuilder.sslContext(sslCtx).hostnameVerifier(NoopHostnameVerifier.INSTANCE);
+ Client client = clientBuilder.build();
+
+ return client.target(brokerUrlTls.toString());
+ }
+
+ PulsarAdmin buildAdminClient() throws Exception {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+
+ return PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrlTls.toString())
+ .useKeyStoreTls(true)
+ .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+ .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+ .allowTlsInsecureConnection(false)
+ .authentication(AuthenticationKeyStoreTls.class.getName(), authParams)
+ .build();
+ }
+
+ @Test
+ public void testSuperUserCanListTenants() throws Exception {
+ try (PulsarAdmin admin = buildAdminClient()) {
+ admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("tenant1",
+ new TenantInfo(ImmutableSet.of("foobar"),
+ ImmutableSet.of("test")));
+ Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+ }
+ }
+
+ @Test
+ public void testSuperUserCantListNamespaces() throws Exception {
+ try (PulsarAdmin admin = buildAdminClient()) {
+ admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("tenant1",
+ new TenantInfo(ImmutableSet.of("proxy"),
+ ImmutableSet.of("test")));
+ admin.namespaces().createNamespace("tenant1/ns1");
+ admin.namespaces().getNamespaces("tenant1").contains("tenant1/ns1");
+ }
+ }
+
+ @Test
+ public void testAuthorizedUserAsOriginalPrincipal() throws Exception {
+ try (PulsarAdmin admin = buildAdminClient()) {
+ admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("tenant1",
+ new TenantInfo(ImmutableSet.of("proxy", "user1"),
+ ImmutableSet.of("test")));
+ admin.namespaces().createNamespace("tenant1/ns1");
+ }
+ WebTarget root = buildWebClient();
+ Assert.assertEquals(ImmutableSet.of("tenant1/ns1"),
+ root.path("/admin/v2/namespaces").path("tenant1")
+ .request(MediaType.APPLICATION_JSON)
+ .header("X-Original-Principal", "user1")
+ .get(new GenericType<List<String>>() {}));
+ }
+
+ @Test
+ public void testPersistentList() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ try (PulsarAdmin admin = buildAdminClient()) {
+ admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("tenant1",
+ new TenantInfo(ImmutableSet.of("foobar"),
+ ImmutableSet.of("test")));
+ Assert.assertEquals(ImmutableSet.of("tenant1"), admin.tenants().getTenants());
+
+ admin.namespaces().createNamespace("tenant1/ns1");
+
+ // this will calls internal admin to list nonpersist topics.
+ admin.topics().getList("tenant1/ns1");
+ } catch (PulsarAdminException ex) {
+ ex.printStackTrace();
+ fail("Should not have thrown an exception");
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
new file mode 100644
index 0000000..14177e0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuth.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+// TLS authentication and authorization based on KeyStore type config.
+@Slf4j
+public class KeyStoreTlsProducerConsumerTestWithAuth extends ProducerConsumerBase {
+ protected final String BROKER_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+ protected final String BROKER_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+ protected final String BROKER_KEYSTORE_PW = "111111";
+ protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+ protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+ protected final String CLIENT_KEYSTORE_PW = "111111";
+ protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_CN = "clientuser";
+ protected final String KEYSTORE_TYPE = "JKS";
+
+ private final String clusterName = "use";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ // TLS configuration for Broker
+ internalSetUpForBroker();
+
+ // Start Broker
+
+ super.init();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ protected void internalSetUpForBroker() throws Exception {
+ conf.setBrokerServicePortTls(Optional.of(0));
+ conf.setWebServicePortTls(Optional.of(0));
+ conf.setTlsEnabledWithKeyStore(true);
+
+ conf.setTlsKeyStoreType(KEYSTORE_TYPE);
+ conf.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+ conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+
+ conf.setTlsTrustStoreType(KEYSTORE_TYPE);
+ conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+ conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+ conf.setClusterName(clusterName);
+ conf.setTlsRequireTrustedClientCertOnConnect(true);
+
+ // config for authentication and authorization.
+ conf.setSuperUserRoles(Sets.newHashSet(CLIENT_KEYSTORE_CN));
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderTls.class.getName());
+ conf.setAuthenticationProviders(providers);
+ }
+
+ protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ }
+
+ Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+ tlsProtocols.add("TLSv1.2");
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(lookupUrl)
+ .enableTls(true)
+ .useKeyStoreTls(true)
+ .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+ .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+ .allowTlsInsecureConnection(false)
+ .tlsProtocols(tlsProtocols)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS);
+ if (addCertificates) {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+ clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+ }
+ pulsarClient = clientBuilder.build();
+ }
+
+ protected void internalSetUpForNamespace() throws Exception {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+
+ if (admin != null) {
+ admin.close();
+ }
+
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
+ .useKeyStoreTls(true)
+ .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+ .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+ .allowTlsInsecureConnection(false)
+ .authentication(AuthenticationKeyStoreTls.class.getName(), authParams).build());
+ admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+ pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()));
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
+ admin.namespaces().createNamespace("my-property/my-ns");
+ }
+
+ /**
+ * verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be
+ * produced/consumed
+ *
+ * @throws Exception
+ */
+ @Test(timeOut = 30000)
+ public void testTlsLargeSizeMessage() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final int MESSAGE_SIZE = 16 * 1024 + 1;
+ log.info("-- message size --", MESSAGE_SIZE);
+ String topicName = "persistent://my-property/use/my-ns/testTlsLargeSizeMessage"
+ + System.currentTimeMillis();
+
+ internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+ internalSetUpForNamespace();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ byte[] message = new byte[MESSAGE_SIZE];
+ Arrays.fill(message, (byte) i);
+ producer.send(message);
+ }
+
+ Message<byte[]> msg = null;
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ byte[] expected = new byte[MESSAGE_SIZE];
+ Arrays.fill(expected, (byte) i);
+ Assert.assertEquals(expected, msg.getData());
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test(timeOut = 300000)
+ public void testTlsClientAuthOverBinaryProtocol() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final int MESSAGE_SIZE = 16 * 1024 + 1;
+ log.info("-- message size --", MESSAGE_SIZE);
+ String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverBinaryProtocol"
+ + System.currentTimeMillis();
+
+ internalSetUpForNamespace();
+
+ // Test 1 - Using TLS on binary protocol without sending certs - expect failure
+ internalSetUpForClient(false, pulsar.getBrokerServiceUrlTls());
+
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ Assert.fail("Server should have failed the TLS handshake since client didn't .");
+ } catch (Exception ex) {
+ // OK
+ }
+
+ // Using TLS on binary protocol - sending certs
+ internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ } catch (Exception ex) {
+ Assert.fail("Should not fail since certs are sent.");
+ }
+ }
+
+ @Test(timeOut = 30000)
+ public void testTlsClientAuthOverHTTPProtocol() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final int MESSAGE_SIZE = 16 * 1024 + 1;
+ log.info("-- message size --", MESSAGE_SIZE);
+ String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverHTTPProtocol"
+ + System.currentTimeMillis();
+
+ internalSetUpForNamespace();
+
+ // Test 1 - Using TLS on https without sending certs - expect failure
+ internalSetUpForClient(false, pulsar.getWebServiceAddressTls());
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ Assert.fail("Server should have failed the TLS handshake since client didn't .");
+ } catch (Exception ex) {
+ // OK
+ }
+
+ // Test 2 - Using TLS on https - sending certs
+ internalSetUpForClient(true, pulsar.getWebServiceAddressTls());
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ } catch (Exception ex) {
+ Assert.fail("Should not fail since certs are sent.");
+ }
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
new file mode 100644
index 0000000..c95f3df
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithoutAuth.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+// TLS test without authentication and authorization based on KeyStore type config.
+@Slf4j
+public class KeyStoreTlsProducerConsumerTestWithoutAuth extends ProducerConsumerBase {
+ protected final String BROKER_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+ protected final String BROKER_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+ protected final String BROKER_KEYSTORE_PW = "111111";
+ protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+ protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+ protected final String CLIENT_KEYSTORE_PW = "111111";
+ protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+ protected final String KEYSTORE_TYPE = "JKS";
+
+ private final String clusterName = "use";
+ Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ // TLS configuration for Broker
+ internalSetUpForBroker();
+
+ // Start Broker
+ super.init();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ protected void internalSetUpForBroker() throws Exception {
+ conf.setBrokerServicePortTls(Optional.of(0));
+ conf.setWebServicePortTls(Optional.of(0));
+ conf.setTlsEnabledWithKeyStore(true);
+
+ conf.setTlsKeyStoreType(KEYSTORE_TYPE);
+ conf.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+ conf.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+
+ conf.setTlsTrustStoreType(KEYSTORE_TYPE);
+ conf.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+ conf.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+ conf.setClusterName(clusterName);
+ conf.setTlsRequireTrustedClientCertOnConnect(true);
+ tlsProtocols.add("TLSv1.2");
+ conf.setTlsProtocols(tlsProtocols);
+ }
+
+ protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ }
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(lookupUrl)
+ .enableTls(true)
+ .useKeyStoreTls(true)
+ .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+ .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+ .allowTlsInsecureConnection(false)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS);
+ if (addCertificates) {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+ clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+ }
+ pulsarClient = clientBuilder.build();
+ }
+
+ protected void internalSetUpForNamespace() throws Exception {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+
+ if (admin != null) {
+ admin.close();
+ }
+
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrlTls.toString())
+ .useKeyStoreTls(true)
+ .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+ .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+ .allowTlsInsecureConnection(true)
+ .authentication(AuthenticationKeyStoreTls.class.getName(), authParams).build());
+ admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+ pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()));
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use")));
+ admin.namespaces().createNamespace("my-property/my-ns");
+ }
+
+ /**
+ * verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be
+ * produced/consumed
+ *
+ * @throws Exception
+ */
+ @Test(timeOut = 30000)
+ public void testTlsLargeSizeMessage() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final int MESSAGE_SIZE = 16 * 1024 + 1;
+ log.info("-- message size --", MESSAGE_SIZE);
+ String topicName = "persistent://my-property/use/my-ns/testTlsLargeSizeMessage"
+ + System.currentTimeMillis();
+
+ internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+ internalSetUpForNamespace();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ byte[] message = new byte[MESSAGE_SIZE];
+ Arrays.fill(message, (byte) i);
+ producer.send(message);
+ }
+
+ Message<byte[]> msg = null;
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ byte[] expected = new byte[MESSAGE_SIZE];
+ Arrays.fill(expected, (byte) i);
+ Assert.assertEquals(expected, msg.getData());
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test(timeOut = 300000)
+ public void testTlsClientAuthOverBinaryProtocol() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final int MESSAGE_SIZE = 16 * 1024 + 1;
+ log.info("-- message size --", MESSAGE_SIZE);
+ String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverBinaryProtocol"
+ + System.currentTimeMillis();
+
+ internalSetUpForNamespace();
+
+ // Test 1 - Using TLS on binary protocol without sending certs - expect failure
+ internalSetUpForClient(false, pulsar.getBrokerServiceUrlTls());
+
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ Assert.fail("Server should have failed the TLS handshake since client didn't .");
+ } catch (Exception ex) {
+ // OK
+ }
+
+ // Test 2 - Using TLS on binary protocol - sending certs
+ internalSetUpForClient(true, pulsar.getBrokerServiceUrlTls());
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ } catch (Exception ex) {
+ Assert.fail("Should not fail since certs are sent.");
+ }
+ }
+
+ @Test(timeOut = 30000)
+ public void testTlsClientAuthOverHTTPProtocol() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final int MESSAGE_SIZE = 16 * 1024 + 1;
+ log.info("-- message size --", MESSAGE_SIZE);
+ String topicName = "persistent://my-property/use/my-ns/testTlsClientAuthOverHTTPProtocol"
+ + System.currentTimeMillis();
+
+ internalSetUpForNamespace();
+
+ // Test 1 - Using TLS on https without sending certs - expect failure
+ internalSetUpForClient(false, pulsar.getWebServiceAddressTls());
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ Assert.fail("Server should have failed the TLS handshake since client didn't .");
+ } catch (Exception ex) {
+ // OK
+ }
+
+ // Test 2 - Using TLS on https - sending certs
+ internalSetUpForClient(true, pulsar.getWebServiceAddressTls());
+ try {
+ pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
+ } catch (Exception ex) {
+ Assert.fail("Should not fail since certs are sent.");
+ }
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsTest.java
new file mode 100644
index 0000000..0f9993d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.common.util.SecurityUtility.getProvider;
+
+import java.security.Provider;
+import javax.net.ssl.SSLContext;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.apache.pulsar.common.util.keystoretls.SSLContextValidatorEngine;
+import org.testng.annotations.Test;
+
+public class KeyStoreTlsTest {
+
+ protected final String BROKER_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+ protected final String BROKER_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+ protected final String BROKER_KEYSTORE_PW = "111111";
+ protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+ protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+ protected final String CLIENT_KEYSTORE_PW = "111111";
+ protected final String CLIENT_TRUSTSTORE_PW = "111111";
+ protected final String KEYSTORE_TYPE = "JKS";
+
+ public static final Provider BC_PROVIDER = getProvider();
+
+ @Test(timeOut = 300000)
+ public void testValidate() throws Exception {
+ KeyStoreSSLContext serverSSLContext = new KeyStoreSSLContext(KeyStoreSSLContext.Mode.SERVER,
+ null,
+ KEYSTORE_TYPE,
+ BROKER_KEYSTORE_FILE_PATH,
+ BROKER_KEYSTORE_PW,
+ false,
+ KEYSTORE_TYPE,
+ BROKER_TRUSTSTORE_FILE_PATH,
+ BROKER_TRUSTSTORE_PW,
+ true,
+ null,
+ null);
+ SSLContext serverCnx = serverSSLContext.createSSLContext();
+
+ KeyStoreSSLContext clientSSLContext = new KeyStoreSSLContext(KeyStoreSSLContext.Mode.CLIENT,
+ null,
+ KEYSTORE_TYPE,
+ CLIENT_KEYSTORE_FILE_PATH,
+ CLIENT_KEYSTORE_PW,
+ false,
+ KEYSTORE_TYPE,
+ CLIENT_TRUSTSTORE_FILE_PATH,
+ CLIENT_TRUSTSTORE_PW,
+ false,
+ null,
+ null);
+ SSLContext clientCnx = clientSSLContext.createSSLContext();
+
+ SSLContextValidatorEngine.validate(clientCnx, serverCnx);
+ }
+}
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/broker.keystore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.keystore.jks
new file mode 100644
index 0000000..b4fec69
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.keystore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/broker.truststore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.truststore.jks
new file mode 100644
index 0000000..8ac03d8
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/broker.truststore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/client.keystore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/client.keystore.jks
new file mode 100644
index 0000000..499c8be
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/client.keystore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/client.truststore.jks b/pulsar-broker/src/test/resources/authentication/keystoretls/client.truststore.jks
new file mode 100644
index 0000000..8eaa06b
Binary files /dev/null and b/pulsar-broker/src/test/resources/authentication/keystoretls/client.truststore.jks differ
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-broker/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt b/pulsar-broker/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index e4e04af..81b3ff6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Authentication;
@@ -172,7 +173,64 @@ public interface PulsarAdminBuilder {
PulsarAdminBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);
/**
- * This sets the connection time out for the pulsar admin client
+ * If Tls is enabled, whether use KeyStore type as tls configuration parameter.
+ * False means use default pem type configuration.
+ *
+ * @param useKeyStoreTls
+ */
+ PulsarAdminBuilder useKeyStoreTls(boolean useKeyStoreTls);
+
+ /**
+ * The name of the security provider used for SSL connections.
+ * Default value is the default security provider of the JVM.
+ *
+ * @param sslProvider
+ */
+ PulsarAdminBuilder sslProvider(String sslProvider);
+
+ /**
+ * The file format of the trust store file.
+ *
+ * @param tlsTrustStoreType
+ */
+ PulsarAdminBuilder tlsTrustStoreType(String tlsTrustStoreType);
+
+ /**
+ * The location of the trust store file.
+ *
+ * @param tlsTrustStorePath
+ */
+ PulsarAdminBuilder tlsTrustStorePath(String tlsTrustStorePath);
+
+ /**
+ * The store password for the key store file.
+ *
+ * @param tlsTrustStorePassword
+ * @return the client builder instance
+ */
+ PulsarAdminBuilder tlsTrustStorePassword(String tlsTrustStorePassword);
+
+ /**
+ * A list of cipher suites.
+ * This is a named combination of authentication, encryption, MAC and key exchange algorithm
+ * used to negotiate the security settings for a network connection using TLS or SSL network protocol.
+ * By default all the available cipher suites are supported.
+ *
+ * @param tlsCiphers
+ */
+ PulsarAdminBuilder tlsCiphers(Set<String> tlsCiphers);
+
+ /**
+ * The SSL protocol used to generate the SSLContext.
+ * Default setting is TLS, which is fine for most cases.
+ * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2.
+ *
+ * @param tlsProtocols
+ */
+ PulsarAdminBuilder tlsProtocols(Set<String> tlsProtocols);
+
+ /**
+ * This sets the connection time out for the pulsar admin client.
*
* @param connectionTimeout
* @param connectionTimeoutUnit
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index bb5588b..d62ac33 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.client.admin.internal;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
@@ -26,9 +30,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
protected final ClientConfigurationData conf;
@@ -104,6 +105,48 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
}
@Override
+ public PulsarAdminBuilder useKeyStoreTls(boolean useKeyStoreTls) {
+ conf.setUseKeyStoreTls(useKeyStoreTls);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder sslProvider(String sslProvider) {
+ conf.setSslProvider(sslProvider);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder tlsTrustStoreType(String tlsTrustStoreType) {
+ conf.setTlsTrustStoreType(tlsTrustStoreType);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder tlsTrustStorePath(String tlsTrustStorePath) {
+ conf.setTlsTrustStorePath(tlsTrustStorePath);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder tlsTrustStorePassword(String tlsTrustStorePassword) {
+ conf.setTlsTrustStorePassword(tlsTrustStorePassword);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder tlsCiphers(Set<String> tlsCiphers) {
+ conf.setTlsCiphers(tlsCiphers);
+ return this;
+ }
+
+ @Override
+ public PulsarAdminBuilder tlsProtocols(Set<String> tlsProtocols) {
+ conf.setTlsProtocols(tlsProtocols);
+ return this;
+ }
+
+ @Override
public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) {
this.connectTimeout = connectionTimeout;
this.connectTimeoutUnit = connectionTimeoutUnit;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index c90bf60..42e9c49 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -34,7 +34,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+
import javax.ws.rs.ProcessingException;
+import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response.Status;
@@ -47,10 +49,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.SecurityUtility;
import org.asynchttpclient.AsyncCompletionHandler;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
@@ -58,6 +62,7 @@ import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.netty.ssl.JsseSslEngineFactory;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.ClientResponse;
@@ -100,20 +105,41 @@ public class AsyncHttpConnector implements Connector {
if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())) {
serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
if (conf.getServiceUrl().startsWith("https://")) {
-
- SslContext sslCtx = null;
-
// Set client key and certificate if available
AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
- if (authData.hasDataForTls()) {
- sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
- conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(), authData.getTlsPrivateKey());
+
+ if (conf.isUseKeyStoreTls()) {
+ KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
+
+ final SSLContext sslCtx = KeyStoreSSLContext.createClientSslContext(
+ conf.getSslProvider(),
+ params != null ? params.getKeyStoreType() : null,
+ params != null ? params.getKeyStorePath() : null,
+ params != null ? params.getKeyStorePassword() : null,
+ conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+ conf.getTlsTrustStoreType(),
+ conf.getTlsTrustStorePath(),
+ conf.getTlsTrustStorePassword(),
+ conf.getTlsCiphers(),
+ conf.getTlsProtocols());
+
+ JsseSslEngineFactory sslEngineFactory = new JsseSslEngineFactory(sslCtx);
+ confBuilder.setSslEngineFactory(sslEngineFactory);
} else {
- sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
- conf.getTlsTrustCertsFilePath());
+ SslContext sslCtx = null;
+ if (authData.hasDataForTls()) {
+ sslCtx = SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+ conf.getTlsTrustCertsFilePath(),
+ authData.getTlsCertificates(),
+ authData.getTlsPrivateKey());
+ } else {
+ sslCtx = SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+ conf.getTlsTrustCertsFilePath());
+ }
+ confBuilder.setSslContext(sslCtx);
}
-
- confBuilder.setSslContext(sslCtx);
}
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index 122dd5b..77eafe5 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -63,6 +63,15 @@ public interface AuthenticationDataProvider extends Serializable {
return null;
}
+ /**
+ * Used for TLS authentication with keystore type.
+ *
+ * @return a KeyStoreParams for the client certificate chain, or null if the data are not available
+ */
+ default KeyStoreParams getTlsKeyStoreParams() {
+ return null;
+ }
+
/*
* HTTP
*/
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index addedaa..e84f8ba 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api;
import java.time.Clock;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
@@ -290,6 +291,69 @@ public interface ClientBuilder extends Cloneable {
ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);
/**
+ * If Tls is enabled, whether use KeyStore type as tls configuration parameter.
+ * False means use default pem type configuration.
+ *
+ * @param useKeyStoreTls
+ * @return the client builder instance
+ */
+ ClientBuilder useKeyStoreTls(boolean useKeyStoreTls);
+
+ /**
+ * The name of the security provider used for SSL connections.
+ * Default value is the default security provider of the JVM.
+ *
+ * @param sslProvider
+ * @return the client builder instance
+ */
+ ClientBuilder sslProvider(String sslProvider);
+
+ /**
+ * The file format of the trust store file.
+ *
+ * @param tlsTrustStoreType
+ * @return the client builder instance
+ */
+ ClientBuilder tlsTrustStoreType(String tlsTrustStoreType);
+
+ /**
+ * The location of the trust store file.
+ *
+ * @param tlsTrustStorePath
+ * @return the client builder instance
+ */
+ ClientBuilder tlsTrustStorePath(String tlsTrustStorePath);
+
+ /**
+ * The store password for the key store file.
+ *
+ * @param tlsTrustStorePassword
+ * @return the client builder instance
+ */
+ ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword);
+
+ /**
+ * A list of cipher suites.
+ * This is a named combination of authentication, encryption, MAC and key exchange algorithm
+ * used to negotiate the security settings for a network connection using TLS or SSL network protocol.
+ * By default all the available cipher suites are supported.
+ *
+ * @param tlsCiphers
+ * @return the client builder instance
+ */
+ ClientBuilder tlsCiphers(Set<String> tlsCiphers);
+
+ /**
+ * The SSL protocol used to generate the SSLContext.
+ * Default setting is TLS, which is fine for most cases.
+ * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2.
+ *
+ * @param tlsProtocols
+ * @return the client builder instance
+ */
+ ClientBuilder tlsProtocols(Set<String> tlsProtocols);
+
+ /**
* Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
* statsInterval It should be set to at least 1 second.
*
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeyStoreParams.java
similarity index 54%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeyStoreParams.java
index 03a9bd3..5759801 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeyStoreParams.java
@@ -16,33 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.authentication;
+package org.apache.pulsar.client.api;
-import java.security.cert.X509Certificate;
-
-import javax.servlet.http.HttpServletRequest;
-
-public class AuthenticationDataHttps extends AuthenticationDataHttp {
-
- protected final X509Certificate[] certificates;
-
- public AuthenticationDataHttps(HttpServletRequest request) {
- super(request);
- certificates = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
- }
-
- /*
- * TLS
- */
-
- @Override
- public boolean hasDataFromTls() {
- return (certificates != null);
- }
-
- @Override
- public X509Certificate[] getTlsCertificates() {
- return certificates;
- }
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+/**
+ * KeyStore parameters used for tls authentication.
+ */
+@Data
+@Builder
+@AllArgsConstructor
+public class KeyStoreParams{
+ private String keyStoreType;
+ private String keyStorePath;
+ private String keyStorePassword;
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 73f4c8d..6391697 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -53,16 +53,22 @@ public class PulsarAdminTool {
@Parameter(names = { "--tls-allow-insecure" }, description = "Allow TLS insecure connection")
Boolean tlsAllowInsecureConnection;
-
+
@Parameter(names = { "--tls-trust-cert-path" }, description = "Allow TLS trust cert file path")
String tlsTrustCertsFilePath;
-
+
@Parameter(names = { "--tls-enable-hostname-verification" }, description = "Enable TLS common name verification")
Boolean tlsEnableHostnameVerification;
@Parameter(names = { "-h", "--help", }, help = true, description = "Show this help.")
boolean help;
+ // for tls with keystore type config
+ boolean useKeyStoreTls = false;
+ String tlsTrustStoreType = "JKS";
+ String tlsTrustStorePath = null;
+ String tlsTrustStorePassword = null;
+
PulsarAdminTool(Properties properties) throws Exception {
// fallback to previous-version serviceUrl property to maintain backward-compatibility
serviceUrl = StringUtils.isNotBlank(properties.getProperty("webServiceUrl"))
@@ -80,9 +86,19 @@ public class PulsarAdminTool {
? this.tlsTrustCertsFilePath
: properties.getProperty("tlsTrustCertsFilePath");
+ this.useKeyStoreTls = Boolean
+ .parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
+ this.tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
+ this.tlsTrustStorePath = properties.getProperty("tlsTrustStorePath");
+ this.tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword");
+
adminBuilder = PulsarAdmin.builder().allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsEnableHostnameVerification)
- .tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+ .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+ .useKeyStoreTls(useKeyStoreTls)
+ .tlsTrustStoreType(tlsTrustStoreType)
+ .tlsTrustStorePath(tlsTrustStorePath)
+ .tlsTrustStorePassword(tlsTrustStorePassword);
jcommander = new JCommander();
jcommander.setProgramName("pulsar-admin");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 2c38b34..b86bc79 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -60,6 +60,12 @@ public class PulsarClientTool {
boolean tlsEnableHostnameVerification = false;
String tlsTrustCertsFilePath = null;
+ // for tls with keystore type config
+ boolean useKeyStoreTls = false;
+ String tlsTrustStoreType = "JKS";
+ String tlsTrustStorePath = null;
+ String tlsTrustStorePassword = null;
+
JCommander commandParser;
CmdProduce produceCommand;
CmdConsume consumeCommand;
@@ -79,6 +85,12 @@ public class PulsarClientTool {
.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
this.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
+ this.useKeyStoreTls = Boolean
+ .parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
+ this.tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
+ this.tlsTrustStorePath = properties.getProperty("tlsTrustStorePath");
+ this.tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword");
+
produceCommand = new CmdProduce();
consumeCommand = new CmdConsume();
@@ -99,6 +111,12 @@ public class PulsarClientTool {
clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
clientBuilder.serviceUrl(serviceURL);
+
+ clientBuilder.useKeyStoreTls(useKeyStoreTls)
+ .tlsTrustStoreType(tlsTrustStoreType)
+ .tlsTrustStorePath(tlsTrustStorePath)
+ .tlsTrustStorePassword(tlsTrustStorePassword);
+
this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 16ea688..3283166 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import java.time.Clock;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -174,6 +175,48 @@ public class ClientBuilderImpl implements ClientBuilder {
}
@Override
+ public ClientBuilder useKeyStoreTls(boolean useKeyStoreTls) {
+ conf.setUseKeyStoreTls(useKeyStoreTls);
+ return this;
+ }
+
+ @Override
+ public ClientBuilder sslProvider(String sslProvider) {
+ conf.setSslProvider(sslProvider);
+ return this;
+ }
+
+ @Override
+ public ClientBuilder tlsTrustStoreType(String tlsTrustStoreType) {
+ conf.setTlsTrustStoreType(tlsTrustStoreType);
+ return this;
+ }
+
+ @Override
+ public ClientBuilder tlsTrustStorePath(String tlsTrustStorePath) {
+ conf.setTlsTrustStorePath(tlsTrustStorePath);
+ return this;
+ }
+
+ @Override
+ public ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword) {
+ conf.setTlsTrustStorePassword(tlsTrustStorePassword);
+ return this;
+ }
+
+ @Override
+ public ClientBuilder tlsCiphers(Set<String> tlsCiphers) {
+ conf.setTlsCiphers(tlsCiphers);
+ return this;
+ }
+
+ @Override
+ public ClientBuilder tlsProtocols(Set<String> tlsProtocols) {
+ conf.setTlsProtocols(tlsProtocols);
+ return this;
+ }
+
+ @Override
public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
return this;
@@ -214,13 +257,13 @@ public class ClientBuilderImpl implements ClientBuilder {
conf.setInitialBackoffIntervalNanos(unit.toNanos(duration));
return this;
}
-
+
@Override
public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
conf.setMaxBackoffIntervalNanos(unit.toNanos(duration));
return this;
}
-
+
public ClientConfigurationData getClientConfigurationData() {
return conf;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 845c741..3e693ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -32,14 +32,18 @@ import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;
+import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
@@ -47,6 +51,7 @@ import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Request;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
+import org.asynchttpclient.netty.ssl.JsseSslEngineFactory;
@Slf4j
@@ -59,24 +64,15 @@ public class HttpClient implements Closeable {
protected final ServiceNameResolver serviceNameResolver;
protected final Authentication authentication;
- protected HttpClient(String serviceUrl, Authentication authentication,
- EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath)
- throws PulsarClientException {
- this(serviceUrl, authentication, eventLoopGroup, tlsAllowInsecureConnection,
- tlsTrustCertsFilePath, DEFAULT_CONNECT_TIMEOUT_IN_SECONDS, DEFAULT_READ_TIMEOUT_IN_SECONDS);
- }
-
- protected HttpClient(String serviceUrl, Authentication authentication,
- EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath,
- int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException {
- this.authentication = authentication;
+ protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
+ this.authentication = conf.getAuthentication();
this.serviceNameResolver = new PulsarServiceNameResolver();
- this.serviceNameResolver.updateServiceUrl(serviceUrl);
+ this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
- confBuilder.setConnectTimeout(connectTimeoutInSeconds * 1000);
- confBuilder.setReadTimeout(readTimeoutInSeconds * 1000);
+ confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
+ confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
@Override
@@ -88,19 +84,45 @@ public class HttpClient implements Closeable {
if ("https".equals(serviceNameResolver.getServiceUri().getServiceName())) {
try {
- SslContext sslCtx = null;
-
// Set client key and certificate if available
AuthenticationDataProvider authData = authentication.getAuthData();
- if (authData.hasDataForTls()) {
- sslCtx = SecurityUtility.createNettySslContextForClient(tlsAllowInsecureConnection, tlsTrustCertsFilePath,
- authData.getTlsCertificates(), authData.getTlsPrivateKey());
+
+ if (conf.isUseKeyStoreTls()) {
+ SSLContext sslCtx = null;
+ KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
+
+ sslCtx = KeyStoreSSLContext.createClientSslContext(
+ conf.getSslProvider(),
+ params != null ? params.getKeyStoreType() : null,
+ params != null ? params.getKeyStorePath() : null,
+ params != null ? params.getKeyStorePassword() : null,
+ conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustStoreType(),
+ conf.getTlsTrustStorePath(),
+ conf.getTlsTrustStorePassword(),
+ conf.getTlsCiphers(),
+ conf.getTlsProtocols());
+
+ JsseSslEngineFactory sslEngineFactory = new JsseSslEngineFactory(sslCtx);
+ confBuilder.setSslEngineFactory(sslEngineFactory);
} else {
- sslCtx = SecurityUtility.createNettySslContextForClient(tlsAllowInsecureConnection, tlsTrustCertsFilePath);
+ SslContext sslCtx = null;
+ if (authData.hasDataForTls()) {
+ sslCtx = SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath(),
+ authData.getTlsCertificates(),
+ authData.getTlsPrivateKey());
+ }
+ else {
+ sslCtx = SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath());
+ }
+ confBuilder.setSslContext(sslCtx);
}
- confBuilder.setSslContext(sslCtx);
- confBuilder.setUseInsecureTrustManager(tlsAllowInsecureConnection);
+ confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection());
} catch (Exception e) {
throw new PulsarClientException.InvalidConfigurationException(e);
}
@@ -109,7 +131,7 @@ public class HttpClient implements Closeable {
AsyncHttpClientConfig config = confBuilder.build();
httpClient = new DefaultAsyncHttpClient(config);
- log.debug("Using HTTP url: {}", serviceUrl);
+ log.debug("Using HTTP url: {}", conf.getServiceUrl());
}
String getServiceUrl() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 5bc3bcd..3451ebc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -57,8 +57,7 @@ public class HttpLookupService implements LookupService {
public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
- this.httpClient = new HttpClient(conf.getServiceUrl(), conf.getAuthentication(),
- eventLoopGroup, conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
+ this.httpClient = new HttpClient(conf, eventLoopGroup);
this.useTls = conf.isUseTls();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index a932253..4a145e7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -18,30 +18,34 @@
*/
package org.apache.pulsar.client.impl;
-import java.security.cert.X509Certificate;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
-
+import io.netty.handler.ssl.SslHandler;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ObjectCache;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
public static final String TLS_HANDLER = "tls";
private final Supplier<ClientCnx> clientCnxSupplier;
private final boolean tlsEnabled;
+ private final boolean tlsEnabledWithKeyStore;
private final Supplier<SslContext> sslContextSupplier;
+ private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
@@ -50,8 +54,24 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
super();
this.clientCnxSupplier = clientCnxSupplier;
this.tlsEnabled = conf.isUseTls();
+ this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
+
+ if (tlsEnabled) {
+ if (tlsEnabledWithKeyStore) {
+ AuthenticationDataProvider authData1 = conf.getAuthentication().getAuthData();
+
+ nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+ conf.getSslProvider(),
+ conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustStoreType(),
+ conf.getTlsTrustStorePath(),
+ conf.getTlsTrustStorePassword(),
+ conf.getTlsCiphers(),
+ conf.getTlsProtocols(),
+ TLS_CERTIFICATE_CACHE_MILLIS,
+ authData1);
+ }
- if (conf.isUseTls()) {
sslContextSupplier = new ObjectCache<SslContext>(() -> {
try {
// Set client certificate if available
@@ -76,7 +96,12 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
@Override
public void initChannel(SocketChannel ch) throws Exception {
if (tlsEnabled) {
- ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc()));
+ if (tlsEnabledWithKeyStore) {
+ ch.pipeline().addLast(TLS_HANDLER,
+ new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
+ } else {
+ ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc()));
+ }
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
similarity index 56%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
index 03a9bd3..6d78004 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataKeyStoreTls.java
@@ -16,33 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.authentication;
+package org.apache.pulsar.client.impl.auth;
-import java.security.cert.X509Certificate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
-import javax.servlet.http.HttpServletRequest;
+@Slf4j
+public class AuthenticationDataKeyStoreTls implements AuthenticationDataProvider {
+ private final KeyStoreParams keyStoreParams;
-public class AuthenticationDataHttps extends AuthenticationDataHttp {
-
- protected final X509Certificate[] certificates;
-
- public AuthenticationDataHttps(HttpServletRequest request) {
- super(request);
- certificates = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
+ public AuthenticationDataKeyStoreTls(KeyStoreParams keyStoreParams) throws Exception {
+ this.keyStoreParams = keyStoreParams;
}
/*
* TLS
*/
-
@Override
- public boolean hasDataFromTls() {
- return (certificates != null);
+ public boolean hasDataForTls() {
+ return true;
}
@Override
- public X509Certificate[] getTlsCertificates() {
- return certificates;
+ public KeyStoreParams getTlsKeyStoreParams() {
+ return this.keyStoreParams;
}
+ @Override
+ public String getCommandData() {
+ return null;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java
new file mode 100644
index 0000000..e8c7764
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationKeyStoreTls.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.KeyStoreParams;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.AuthenticationUtil;
+
+/**
+ * This plugin requires these parameters: keyStoreType, keyStorePath, and keyStorePassword.
+ * This parameter will construct a AuthenticationDataProvider
+ */
+@Slf4j
+public class AuthenticationKeyStoreTls implements Authentication, EncodedAuthenticationParameterSupport {
+ private static final long serialVersionUID = 1L;
+
+ private final static String AUTH_NAME = "tls";
+
+ // parameter name
+ public final static String KEYSTORE_TYPE = "keyStoreType";
+ public final static String KEYSTORE_PATH= "keyStorePath";
+ public final static String KEYSTORE_PW = "keyStorePassword";
+ private final static String DEFAULT_KEYSTORE_TYPE = "JKS";
+
+ private KeyStoreParams keyStoreParams;
+
+ public AuthenticationKeyStoreTls() {
+ }
+
+ public AuthenticationKeyStoreTls(String keyStoreType, String keyStorePath, String keyStorePassword) {
+ this.keyStoreParams = KeyStoreParams.builder()
+ .keyStoreType(keyStoreType)
+ .keyStorePath(keyStorePath)
+ .keyStorePassword(keyStorePassword)
+ .build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return AUTH_NAME;
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ try {
+ return new AuthenticationDataKeyStoreTls(this.keyStoreParams);
+ } catch (Exception e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ // passed in KEYSTORE_TYPE/KEYSTORE_PATH/KEYSTORE_PW to construct parameters.
+ // e.g. {"keyStoreType":"JKS","keyStorePath":"/path/to/keystorefile","keyStorePassword":"keystorepw"}
+ // or: "keyStoreType":"JKS","keyStorePath":"/path/to/keystorefile","keyStorePassword":"keystorepw"
+ @Override
+ public void configure(String paramsString) {
+ Map<String, String> params = null;
+ try {
+ params = AuthenticationUtil.configureFromJsonString(paramsString);
+ } catch (Exception e) {
+ // auth-param is not in json format
+ log.info("parameter not in Json format: {}", paramsString);
+ }
+
+ // in ":" "," format.
+ params = (params == null || params.isEmpty())
+ ? AuthenticationUtil.configureFromPulsar1AuthParamString(paramsString)
+ : params;
+
+ configure(params);
+ }
+
+ @Override
+ public void configure(Map<String, String> params) {
+ String keyStoreType = params.get(KEYSTORE_TYPE);
+ String keyStorePath = params.get(KEYSTORE_PATH);
+ String keyStorePassword = params.get(KEYSTORE_PW);
+
+ if (Strings.isNullOrEmpty(keyStorePath)
+ || Strings.isNullOrEmpty(keyStorePassword)) {
+ throw new IllegalArgumentException("Passed in parameter empty. "
+ + KEYSTORE_PATH + ": " + keyStorePath
+ + " " + KEYSTORE_PW + ": " + keyStorePassword);
+ }
+
+ if (Strings.isNullOrEmpty(keyStoreType)) {
+ keyStoreType = DEFAULT_KEYSTORE_TYPE;
+ }
+
+ this.keyStoreParams = KeyStoreParams.builder()
+ .keyStoreType(keyStoreType)
+ .keyStorePath(keyStorePath)
+ .keyStorePassword(keyStorePassword)
+ .build();
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // noop
+ }
+
+ // return strings like : "key1":"value1", "key2":"value2", ...
+ public static String mapToString(Map<String, String> map) {
+ return Joiner.on(',').withKeyValueSeparator(':').join(map);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index 22cd2f5..d899146 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -40,7 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
*
*/
public class AuthenticationTls implements Authentication, EncodedAuthenticationParameterSupport {
-
+ private final static String AUTH_NAME = "tls";
private static final long serialVersionUID = 1L;
private String certFilePath;
@@ -67,7 +67,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP
@Override
public String getAuthMethodName() {
- return "tls";
+ return AUTH_NAME;
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index af6ad8d..0cf496a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Sets;
import java.time.Clock;
+import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -70,6 +72,16 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);
+ // set TLS using KeyStore way.
+ private boolean useKeyStoreTls = false;
+ private String sslProvider = null;
+ // needed when client auth is required
+ private String tlsTrustStoreType = "JKS";
+ private String tlsTrustStorePath = null;
+ private String tlsTrustStorePassword = null;
+ private Set<String> tlsCiphers = Sets.newTreeSet();
+ private Set<String> tlsProtocols = Sets.newTreeSet();
+
@JsonIgnore
private Clock clock = Clock.systemDefaultZone();
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 3c74a4e..6974b0c 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -146,6 +146,10 @@
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
deleted file mode 100644
index 48ac937..0000000
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClientSslContextRefresher.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util;
-
-import io.netty.handler.ssl.SslContext;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.security.cert.X509Certificate;
-import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("checkstyle:JavadocType")
-public class ClientSslContextRefresher {
- private volatile SslContext sslContext;
- private boolean tlsAllowInsecureConnection;
- private String tlsTrustCertsFilePath;
- private AuthenticationDataProvider authData;
-
- public ClientSslContextRefresher(boolean allowInsecure, String trustCertsFilePath,
- AuthenticationDataProvider authData) throws IOException, GeneralSecurityException {
- this.tlsAllowInsecureConnection = allowInsecure;
- this.tlsTrustCertsFilePath = trustCertsFilePath;
- this.authData = authData;
-
- if (authData != null && authData.hasDataForTls()) {
- this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
- this.tlsTrustCertsFilePath, (X509Certificate[]) authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
- } else {
- this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
- this.tlsTrustCertsFilePath);
- }
- }
-
- public SslContext get() {
- if (authData != null && authData.hasDataForTls()) {
- try {
- this.sslContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
- this.tlsTrustCertsFilePath, (X509Certificate[]) authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
- } catch (GeneralSecurityException | IOException e) {
- LOG.error("Exception occured while trying to refresh sslContext: ", e);
- }
-
- }
- return sslContext;
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(ClientSslContextRefresher.class);
-}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
index 3e888f4..c49bdd6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DefaultSslContextBuilder.java
@@ -29,11 +29,19 @@ import javax.net.ssl.SSLException;
public class DefaultSslContextBuilder extends SslContextAutoRefreshBuilder<SSLContext> {
private volatile SSLContext sslContext;
+ protected final boolean tlsAllowInsecureConnection;
+ protected final FileModifiedTimeUpdater tlsTrustCertsFilePath, tlsCertificateFilePath, tlsKeyFilePath;
+ protected final boolean tlsRequireTrustedClientCertOnConnect;
+
public DefaultSslContextBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
String keyFilePath, boolean requireTrustedClientCertOnConnect, long certRefreshInSec)
throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
- super(allowInsecure, trustCertsFilePath, certificateFilePath, keyFilePath, null, null,
- requireTrustedClientCertOnConnect, certRefreshInSec);
+ super(certRefreshInSec);
+ this.tlsAllowInsecureConnection = allowInsecure;
+ this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
+ this.tlsCertificateFilePath = new FileModifiedTimeUpdater(certificateFilePath);
+ this.tlsKeyFilePath = new FileModifiedTimeUpdater(keyFilePath);
+ this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
}
@Override
@@ -49,4 +57,10 @@ public class DefaultSslContextBuilder extends SslContextAutoRefreshBuilder<SSLCo
return this.sslContext;
}
+ @Override
+ public boolean needUpdate() {
+ return tlsTrustCertsFilePath.checkAndRefresh()
+ || tlsCertificateFilePath.checkAndRefresh()
+ || tlsKeyFilePath.checkAndRefresh();
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
new file mode 100644
index 0000000..48cf992
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import io.netty.handler.ssl.SslContext;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+/**
+ * SSL context builder for Netty Client side.
+ */
+@Slf4j
+public class NettyClientSslContextRefresher extends SslContextAutoRefreshBuilder<SslContext> {
+ private volatile SslContext sslNettyContext;
+ private boolean tlsAllowInsecureConnection;
+ protected final FileModifiedTimeUpdater tlsTrustCertsFilePath;
+ private AuthenticationDataProvider authData;
+
+ public NettyClientSslContextRefresher(boolean allowInsecure,
+ String trustCertsFilePath,
+ AuthenticationDataProvider authData,
+ long delayInSeconds)
+ throws IOException, GeneralSecurityException {
+ super(delayInSeconds);
+ this.tlsAllowInsecureConnection = allowInsecure;
+ this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
+ this.authData = authData;
+ }
+
+ @Override
+ public synchronized SslContext update()
+ throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
+ if (authData != null && authData.hasDataForTls()) {
+ this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+ this.tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
+ authData.getTlsPrivateKey());
+ } else {
+ this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+ this.tlsTrustCertsFilePath.getFileName());
+ }
+ return this.sslNettyContext;
+ }
+
+ @Override
+ public SslContext getSslContext() {
+ return this.sslNettyContext;
+ }
+
+ @Override
+ public boolean needUpdate() {
+ return tlsTrustCertsFilePath.checkAndRefresh();
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyServerSslContextBuilder.java
similarity index 52%
rename from pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyServerSslContextBuilder.java
index 713c52d..250e628 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettySslContextBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyServerSslContextBuilder.java
@@ -26,16 +26,29 @@ import java.util.Set;
import javax.net.ssl.SSLException;
/**
- * SSL context builder for Netty.
+ * SSL context builder for Netty Server side.
*/
-public class NettySslContextBuilder extends SslContextAutoRefreshBuilder<SslContext> {
+public class NettyServerSslContextBuilder extends SslContextAutoRefreshBuilder<SslContext> {
private volatile SslContext sslNettyContext;
- public NettySslContextBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
- String keyFilePath, Set<String> ciphers, Set<String> protocols, boolean requireTrustedClientCertOnConnect,
- long delayInSeconds) throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
- super(allowInsecure, trustCertsFilePath, certificateFilePath, keyFilePath, ciphers, protocols,
- requireTrustedClientCertOnConnect, delayInSeconds);
+ protected final boolean tlsAllowInsecureConnection;
+ protected final FileModifiedTimeUpdater tlsTrustCertsFilePath, tlsCertificateFilePath, tlsKeyFilePath;
+ protected final Set<String> tlsCiphers;
+ protected final Set<String> tlsProtocols;
+ protected final boolean tlsRequireTrustedClientCertOnConnect;
+
+ public NettyServerSslContextBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
+ String keyFilePath, Set<String> ciphers, Set<String> protocols,
+ boolean requireTrustedClientCertOnConnect,
+ long delayInSeconds) {
+ super(delayInSeconds);
+ this.tlsAllowInsecureConnection = allowInsecure;
+ this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
+ this.tlsCertificateFilePath = new FileModifiedTimeUpdater(certificateFilePath);
+ this.tlsKeyFilePath = new FileModifiedTimeUpdater(keyFilePath);
+ this.tlsCiphers = ciphers;
+ this.tlsProtocols = protocols;
+ this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
}
@Override
@@ -52,4 +65,10 @@ public class NettySslContextBuilder extends SslContextAutoRefreshBuilder<SslCont
return this.sslNettyContext;
}
+ @Override
+ public boolean needUpdate() {
+ return tlsTrustCertsFilePath.checkAndRefresh()
+ || tlsCertificateFilePath.checkAndRefresh()
+ || tlsKeyFilePath.checkAndRefresh();
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
index 5fa9c1b..a29f051 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SslContextAutoRefreshBuilder.java
@@ -18,16 +18,10 @@
*/
package org.apache.pulsar.common.util;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
/**
* Auto refresher and builder of SSLContext.
@@ -35,30 +29,18 @@ import org.slf4j.LoggerFactory;
* @param <T>
* type of SSLContext
*/
+@Slf4j
public abstract class SslContextAutoRefreshBuilder<T> {
- protected final boolean tlsAllowInsecureConnection;
- protected final FileModifiedTimeUpdater tlsTrustCertsFilePath, tlsCertificateFilePath, tlsKeyFilePath;
- protected final Set<String> tlsCiphers;
- protected final Set<String> tlsProtocols;
- protected final boolean tlsRequireTrustedClientCertOnConnect;
protected final long refreshTime;
protected long lastRefreshTime;
- public SslContextAutoRefreshBuilder(boolean allowInsecure, String trustCertsFilePath, String certificateFilePath,
- String keyFilePath, Set<String> ciphers, Set<String> protocols, boolean requireTrustedClientCertOnConnect,
- long certRefreshInSec) throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
- this.tlsAllowInsecureConnection = allowInsecure;
- this.tlsTrustCertsFilePath = new FileModifiedTimeUpdater(trustCertsFilePath);
- this.tlsCertificateFilePath = new FileModifiedTimeUpdater(certificateFilePath);
- this.tlsKeyFilePath = new FileModifiedTimeUpdater(keyFilePath);
- this.tlsCiphers = ciphers;
- this.tlsProtocols = protocols;
- this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
+ public SslContextAutoRefreshBuilder(
+ long certRefreshInSec) {
this.refreshTime = TimeUnit.SECONDS.toMillis(certRefreshInSec);
this.lastRefreshTime = -1;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Certs will be refreshed every {} seconds", certRefreshInSec);
+ if (log.isDebugEnabled()) {
+ log.debug("Certs will be refreshed every {} seconds", certRefreshInSec);
}
}
@@ -79,6 +61,13 @@ public abstract class SslContextAutoRefreshBuilder<T> {
protected abstract T getSslContext();
/**
+ * Returns whether the key files modified after a refresh time, and context need update.
+ *
+ * @return true if files modified
+ */
+ protected abstract boolean needUpdate();
+
+ /**
* It updates SSLContext at every configured refresh time and returns updated SSLContext.
*
* @return
@@ -91,24 +80,21 @@ public abstract class SslContextAutoRefreshBuilder<T> {
lastRefreshTime = System.currentTimeMillis();
return getSslContext();
} catch (GeneralSecurityException | IOException e) {
- LOG.error("Execption while trying to refresh ssl Context {}", e.getMessage(), e);
+ log.error("Exception while trying to refresh ssl Context {}", e.getMessage(), e);
}
} else {
long now = System.currentTimeMillis();
if (refreshTime <= 0 || now > (lastRefreshTime + refreshTime)) {
- if (tlsTrustCertsFilePath.checkAndRefresh() || tlsCertificateFilePath.checkAndRefresh()
- || tlsKeyFilePath.checkAndRefresh()) {
+ if (needUpdate()) {
try {
ctx = update();
lastRefreshTime = now;
} catch (GeneralSecurityException | IOException e) {
- LOG.error("Execption while trying to refresh ssl Context {} ", e.getMessage(), e);
+ log.error("Exception while trying to refresh ssl Context {} ", e.getMessage(), e);
}
}
}
}
return ctx;
}
-
- private static final Logger LOG = LoggerFactory.getLogger(SslContextAutoRefreshBuilder.class);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
new file mode 100644
index 0000000..b9ad2e7
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import static org.apache.pulsar.common.util.SecurityUtility.getProvider;
+
+import com.google.common.base.Strings;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.Provider;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * KeyStoreSSLContext that mainly wrap a SSLContext to provide SSL context for both webservice and netty.
+ */
+@Slf4j
+public class KeyStoreSSLContext {
+ public static final String DEFAULT_KEYSTORE_TYPE = "JKS";
+ public static final String DEFAULT_SSL_PROTOCOL = "TLS";
+ public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
+ public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
+ public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
+
+ public static final Provider BC_PROVIDER = getProvider();
+
+ /**
+ * Connection Mode for TLS.
+ */
+ public enum Mode {
+ CLIENT,
+ SERVER
+ }
+
+ @Getter
+ private final Mode mode;
+
+ private String sslProviderString;
+ private String keyStoreTypeString;
+ private String keyStorePath;
+ private String keyStorePassword;
+ private boolean allowInsecureConnection;
+ private String trustStoreTypeString;
+ private String trustStorePath;
+ private String trustStorePassword;
+ private boolean needClientAuth;
+ private Set<String> ciphers;
+ private Set<String> protocols;
+ @Getter
+ private SSLContext sslContext;
+
+ private String protocol = DEFAULT_SSL_PROTOCOL;
+ private String kmfAlgorithm = DEFAULT_SSL_KEYMANGER_ALGORITHM;
+ private String tmfAlgorithm = DEFAULT_SSL_TRUSTMANAGER_ALGORITHM;
+
+ // only init vars, before using it, need to call createSSLContext to create ssl context.
+ public KeyStoreSSLContext(Mode mode,
+ String sslProviderString,
+ String keyStoreTypeString,
+ String keyStorePath,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStorePath,
+ String trustStorePassword,
+ boolean requireTrustedClientCertOnConnect,
+ Set<String> ciphers,
+ Set<String> protocols) {
+ this.mode = mode;
+ this.sslProviderString = sslProviderString;
+ this.keyStoreTypeString = Strings.isNullOrEmpty(keyStoreTypeString)
+ ? DEFAULT_KEYSTORE_TYPE
+ : keyStoreTypeString;
+ this.keyStorePath = keyStorePath;
+ this.keyStorePassword = keyStorePassword;
+ this.trustStoreTypeString = Strings.isNullOrEmpty(trustStoreTypeString)
+ ? DEFAULT_KEYSTORE_TYPE
+ : trustStoreTypeString;
+ this.trustStorePath = trustStorePath;
+ this.trustStorePassword = trustStorePassword;
+ this.needClientAuth = requireTrustedClientCertOnConnect;
+ this.ciphers = ciphers;
+ this.protocols = protocols;
+
+ if (protocols != null && protocols.size() > 0) {
+ this.protocols = protocols;
+ } else {
+ this.protocols = new HashSet<>(Arrays.asList(DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")));
+ }
+
+ if (ciphers != null && ciphers.size() > 0) {
+ this.ciphers = ciphers;
+ } else {
+ this.ciphers = null;
+ }
+
+ this.allowInsecureConnection = allowInsecureConnection;
+ }
+
+ public SSLContext createSSLContext() throws GeneralSecurityException, IOException {
+ SSLContext sslContext;
+ if (sslProviderString != null) {
+ sslContext = SSLContext.getInstance(protocol, sslProviderString);
+ } else {
+ sslContext = SSLContext.getInstance(protocol);
+ }
+
+ // key store
+ KeyManager[] keyManagers = null;
+ if (!Strings.isNullOrEmpty(keyStorePath)) {
+ KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(kmfAlgorithm);
+ KeyStore keyStore = KeyStore.getInstance(keyStoreTypeString);
+ char[] passwordChars = keyStorePassword.toCharArray();
+ keyStore.load(new FileInputStream(keyStorePath), passwordChars);
+ keyManagerFactory.init(keyStore, passwordChars);
+ keyManagers = keyManagerFactory.getKeyManagers();
+ }
+
+ // trust store
+ TrustManagerFactory trustManagerFactory;
+ if (this.allowInsecureConnection) {
+ trustManagerFactory = InsecureTrustManagerFactory.INSTANCE;
+ } else {
+ trustManagerFactory = TrustManagerFactory.getInstance(tmfAlgorithm);
+ KeyStore trustStore = KeyStore.getInstance(trustStoreTypeString);
+ char[] passwordChars = trustStorePassword.toCharArray();
+ trustStore.load(new FileInputStream(trustStorePath), passwordChars);
+ trustManagerFactory.init(trustStore);
+ }
+
+ // init
+ sslContext.init(keyManagers, trustManagerFactory.getTrustManagers(), new SecureRandom());
+ this.sslContext = sslContext;
+ return sslContext;
+ }
+
+ public SSLEngine createSSLEngine() {
+ SSLEngine sslEngine = sslContext.createSSLEngine();
+
+ sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
+ sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
+
+ if (this.mode == Mode.SERVER) {
+ sslEngine.setNeedClientAuth(this.needClientAuth);
+ sslEngine.setUseClientMode(false);
+ } else {
+ sslEngine.setUseClientMode(true);
+ }
+
+ return sslEngine;
+ }
+
+ public static KeyStoreSSLContext createClientKeyStoreSslContext(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStorePath,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStorePath,
+ String trustStorePassword,
+ Set<String> ciphers,
+ Set<String> protocols)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+ KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.CLIENT,
+ sslProviderString,
+ keyStoreTypeString,
+ keyStorePath,
+ keyStorePassword,
+ allowInsecureConnection,
+ trustStoreTypeString,
+ trustStorePath,
+ trustStorePassword,
+ false,
+ ciphers,
+ protocols);
+
+ keyStoreSSLContext.createSSLContext();
+ return keyStoreSSLContext;
+ }
+
+
+ public static KeyStoreSSLContext createServerKeyStoreSslContext(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStorePath,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStorePath,
+ String trustStorePassword,
+ boolean requireTrustedClientCertOnConnect,
+ Set<String> ciphers,
+ Set<String> protocols)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+ KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.SERVER,
+ sslProviderString,
+ keyStoreTypeString,
+ keyStorePath,
+ keyStorePassword,
+ allowInsecureConnection,
+ trustStoreTypeString,
+ trustStorePath,
+ trustStorePassword,
+ requireTrustedClientCertOnConnect,
+ ciphers,
+ protocols);
+
+ keyStoreSSLContext.createSSLContext();
+ return keyStoreSSLContext;
+ }
+
+ // for web server use case, no need ciphers and protocols
+ public static SSLContext createServerSslContext(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStorePath,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStorePath,
+ String trustStorePassword,
+ boolean requireTrustedClientCertOnConnect)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+
+ return createServerKeyStoreSslContext(
+ sslProviderString,
+ keyStoreTypeString,
+ keyStorePath,
+ keyStorePassword,
+ allowInsecureConnection,
+ trustStoreTypeString,
+ trustStorePath,
+ trustStorePassword,
+ requireTrustedClientCertOnConnect,
+ null,
+ null).getSslContext();
+ }
+
+ // for web client
+ public static SSLContext createClientSslContext(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStorePath,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStorePath,
+ String trustStorePassword,
+ Set<String> ciphers,
+ Set<String> protocol)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+ KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.CLIENT,
+ sslProviderString,
+ keyStoreTypeString,
+ keyStorePath,
+ keyStorePassword,
+ allowInsecureConnection,
+ trustStoreTypeString,
+ trustStorePath,
+ trustStorePassword,
+ false,
+ ciphers,
+ protocol);
+
+ return keyStoreSSLContext.createSSLContext();
+ }
+
+ // for web client
+ public static SSLContext createClientSslContext(String keyStoreTypeString,
+ String keyStorePath,
+ String keyStorePassword,
+ String trustStoreTypeString,
+ String trustStorePath,
+ String trustStorePassword)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+ KeyStoreSSLContext keyStoreSSLContext = new KeyStoreSSLContext(Mode.CLIENT,
+ null,
+ keyStoreTypeString,
+ keyStorePath,
+ keyStorePassword,
+ false,
+ trustStoreTypeString,
+ trustStorePath,
+ trustStorePassword,
+ false,
+ null,
+ null);
+
+ return keyStoreSSLContext.createSSLContext();
+ }
+
+ // for web server. autoRefresh is default true.
+ public static SslContextFactory createSslContextFactory(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStore,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStore,
+ String trustStorePassword,
+ boolean requireTrustedClientCertOnConnect,
+ long certRefreshInSec)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+ SslContextFactory sslCtxFactory;
+
+ sslCtxFactory = new SslContextFactoryWithAutoRefresh(
+ sslProviderString,
+ keyStoreTypeString,
+ keyStore,
+ keyStorePassword,
+ allowInsecureConnection,
+ trustStoreTypeString,
+ trustStore,
+ trustStorePassword,
+ requireTrustedClientCertOnConnect,
+ certRefreshInSec);
+
+ if (requireTrustedClientCertOnConnect) {
+ sslCtxFactory.setNeedClientAuth(true);
+ } else {
+ sslCtxFactory.setWantClientAuth(true);
+ }
+ sslCtxFactory.setTrustAll(true);
+
+ return sslCtxFactory;
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NetSslContextBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NetSslContextBuilder.java
new file mode 100644
index 0000000..38ebdb4
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NetSslContextBuilder.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+
+/**
+ * Similar to `DefaultSslContextBuilder`, which build `javax.net.ssl.SSLContext` for web service.
+ */
+public class NetSslContextBuilder extends SslContextAutoRefreshBuilder<SSLContext> {
+ private volatile SSLContext sslContext;
+
+ protected final boolean tlsAllowInsecureConnection;
+ protected final boolean tlsRequireTrustedClientCertOnConnect;
+
+ protected final String tlsProvider;
+ protected final String tlsKeyStoreType;
+ protected final String tlsKeyStorePassword;
+ protected final FileModifiedTimeUpdater tlsKeyStore;
+ protected final String tlsTrustStoreType;
+ protected final String tlsTrustStorePassword;
+ protected final FileModifiedTimeUpdater tlsTrustStore;
+
+ public NetSslContextBuilder(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStore,
+ String keyStorePasswordPath,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStore,
+ String trustStorePasswordPath,
+ boolean requireTrustedClientCertOnConnect,
+ long certRefreshInSec) {
+ super(certRefreshInSec);
+
+ this.tlsAllowInsecureConnection = allowInsecureConnection;
+ this.tlsProvider = sslProviderString;
+ this.tlsKeyStoreType = keyStoreTypeString;
+ this.tlsKeyStore = new FileModifiedTimeUpdater(keyStore);
+ this.tlsKeyStorePassword = keyStorePasswordPath;
+
+ this.tlsTrustStoreType = trustStoreTypeString;
+ this.tlsTrustStore = new FileModifiedTimeUpdater(trustStore);
+ this.tlsTrustStorePassword = trustStorePasswordPath;
+
+ this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
+ }
+
+ @Override
+ public synchronized SSLContext update()
+ throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
+ this.sslContext = KeyStoreSSLContext.createServerSslContext(tlsProvider,
+ tlsKeyStoreType, tlsKeyStore.getFileName(), tlsKeyStorePassword,
+ tlsAllowInsecureConnection,
+ tlsTrustStoreType, tlsTrustStore.getFileName(), tlsTrustStorePassword,
+ tlsRequireTrustedClientCertOnConnect);
+ return this.sslContext;
+ }
+
+ @Override
+ public SSLContext getSslContext() {
+ return this.sslContext;
+ }
+
+ @Override
+ public boolean needUpdate() {
+ return tlsKeyStore.checkAndRefresh()
+ || tlsTrustStore.checkAndRefresh();
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
new file mode 100644
index 0000000..363fe1e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/NettySSLContextAutoRefreshBuilder.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Set;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.KeyStoreParams;
+import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+
+/**
+ * SSL context builder for Netty.
+ */
+public class NettySSLContextAutoRefreshBuilder extends SslContextAutoRefreshBuilder<KeyStoreSSLContext> {
+ private volatile KeyStoreSSLContext keyStoreSSLContext;
+
+ protected final boolean tlsAllowInsecureConnection;
+ protected final Set<String> tlsCiphers;
+ protected final Set<String> tlsProtocols;
+ protected boolean tlsRequireTrustedClientCertOnConnect;
+
+ protected final String tlsProvider;
+ protected final String tlsTrustStoreType;
+ protected final String tlsTrustStorePassword;
+ protected final FileModifiedTimeUpdater tlsTrustStore;
+
+ // client context not need keystore at start time, keyStore is passed in by authData.
+ protected String tlsKeyStoreType;
+ protected String tlsKeyStorePassword;
+ protected FileModifiedTimeUpdater tlsKeyStore;
+
+ protected AuthenticationDataProvider authData;
+ protected final boolean isServer;
+
+ // for server
+ public NettySSLContextAutoRefreshBuilder(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStore,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStore,
+ String trustStorePassword,
+ boolean requireTrustedClientCertOnConnect,
+ Set<String> ciphers,
+ Set<String> protocols,
+ long certRefreshInSec) {
+ super(certRefreshInSec);
+
+ this.tlsAllowInsecureConnection = allowInsecureConnection;
+ this.tlsProvider = sslProviderString;
+
+ this.tlsKeyStoreType = keyStoreTypeString;
+ this.tlsKeyStore = new FileModifiedTimeUpdater(keyStore);
+ this.tlsKeyStorePassword = keyStorePassword;
+
+ this.tlsTrustStoreType = trustStoreTypeString;
+ this.tlsTrustStore = new FileModifiedTimeUpdater(trustStore);
+ this.tlsTrustStorePassword = trustStorePassword;
+
+ this.tlsRequireTrustedClientCertOnConnect = requireTrustedClientCertOnConnect;
+ this.tlsCiphers = ciphers;
+ this.tlsProtocols = protocols;
+
+ this.isServer = true;
+ }
+
+ // for client
+ public NettySSLContextAutoRefreshBuilder(String sslProviderString,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStore,
+ String trustStorePassword,
+ Set<String> ciphers,
+ Set<String> protocols,
+ long certRefreshInSec,
+ AuthenticationDataProvider authData) {
+ super(certRefreshInSec);
+
+ this.tlsAllowInsecureConnection = allowInsecureConnection;
+ this.tlsProvider = sslProviderString;
+
+ this.authData = authData;
+
+ this.tlsTrustStoreType = trustStoreTypeString;
+ this.tlsTrustStore = new FileModifiedTimeUpdater(trustStore);
+ this.tlsTrustStorePassword = trustStorePassword;
+
+ this.tlsCiphers = ciphers;
+ this.tlsProtocols = protocols;
+
+ this.isServer = false;
+ }
+
+ @Override
+ public synchronized KeyStoreSSLContext update() throws GeneralSecurityException, IOException {
+ if (isServer) {
+ this.keyStoreSSLContext = KeyStoreSSLContext.createServerKeyStoreSslContext(tlsProvider,
+ tlsKeyStoreType, tlsKeyStore.getFileName(), tlsKeyStorePassword,
+ tlsAllowInsecureConnection,
+ tlsTrustStoreType, tlsTrustStore.getFileName(), tlsTrustStorePassword,
+ tlsRequireTrustedClientCertOnConnect, tlsCiphers, tlsProtocols);
+ } else {
+ KeyStoreParams authParams = authData.getTlsKeyStoreParams();
+ this.keyStoreSSLContext = KeyStoreSSLContext.createClientKeyStoreSslContext(tlsProvider,
+ authParams != null ? authParams.getKeyStoreType() : null,
+ authParams != null ? authParams.getKeyStorePath() : null,
+ authParams != null ? authParams.getKeyStorePassword() : null,
+ tlsAllowInsecureConnection,
+ tlsTrustStoreType, tlsTrustStore.getFileName(), tlsTrustStorePassword,
+ tlsCiphers, tlsProtocols);
+ }
+ return this.keyStoreSSLContext;
+ }
+
+ @Override
+ public KeyStoreSSLContext getSslContext() {
+ return this.keyStoreSSLContext;
+ }
+
+ @Override
+ public boolean needUpdate() {
+ return (tlsKeyStore != null && tlsKeyStore.checkAndRefresh())
+ || (tlsTrustStore != null && tlsTrustStore.checkAndRefresh());
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
new file mode 100644
index 0000000..555d96e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.nio.ByteBuffer;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLParameters;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * SSLContextValidatorEngine to validate 2 SSlContext.
+ */
+@Slf4j
+public class SSLContextValidatorEngine {
+ /**
+ * Mode of peer.
+ */
+ public enum Mode {
+ CLIENT,
+ SERVER
+ }
+
+ private static final ByteBuffer EMPTY_BUF = ByteBuffer.allocate(0);
+ private final SSLEngine sslEngine;
+ private SSLEngineResult handshakeResult;
+ private ByteBuffer appBuffer;
+ private ByteBuffer netBuffer;
+ private Mode mode;
+
+ public static void validate(SSLContext clientSslContext, SSLContext serverSslContext) throws SSLException {
+ SSLContextValidatorEngine clientEngine = new SSLContextValidatorEngine(clientSslContext, Mode.CLIENT);
+ SSLContextValidatorEngine serverEngine = new SSLContextValidatorEngine(serverSslContext, Mode.SERVER);
+ try {
+ clientEngine.beginHandshake();
+ serverEngine.beginHandshake();
+ while (!serverEngine.complete() || !clientEngine.complete()) {
+ clientEngine.handshake(serverEngine);
+ serverEngine.handshake(clientEngine);
+ }
+ } finally {
+ clientEngine.close();
+ serverEngine.close();
+ }
+ }
+
+ private SSLContextValidatorEngine(SSLContext sslContext, Mode mode) {
+ this.mode = mode;
+ this.sslEngine = createSslEngine(sslContext, "localhost", 0); // these hints are not used for validation
+ sslEngine.setUseClientMode(mode == Mode.CLIENT);
+ appBuffer = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
+ netBuffer = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
+ }
+
+ private SSLEngine createSslEngine(SSLContext sslContext, String peerHost, int peerPort) {
+ SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
+
+ if (mode == Mode.SERVER) {
+ sslEngine.setNeedClientAuth(true);
+ } else {
+ sslEngine.setUseClientMode(true);
+ SSLParameters sslParams = sslEngine.getSSLParameters();
+ sslEngine.setSSLParameters(sslParams);
+ }
+ return sslEngine;
+ }
+
+ void beginHandshake() throws SSLException {
+ sslEngine.beginHandshake();
+ }
+
+ void handshake(SSLContextValidatorEngine peerEngine) throws SSLException {
+ SSLEngineResult.HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+ while (true) {
+ switch (handshakeStatus) {
+ case NEED_WRAP:
+ handshakeResult = sslEngine.wrap(EMPTY_BUF, netBuffer);
+ switch (handshakeResult.getStatus()) {
+ case OK: break;
+ case BUFFER_OVERFLOW:
+ netBuffer.compact();
+ netBuffer = ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+ netBuffer.flip();
+ break;
+ case BUFFER_UNDERFLOW:
+ case CLOSED:
+ default:
+ throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+ }
+ return;
+ case NEED_UNWRAP:
+ if (peerEngine.netBuffer.position() == 0) {
+ return;
+ }
+ peerEngine.netBuffer.flip(); // unwrap the data from peer
+ handshakeResult = sslEngine.unwrap(peerEngine.netBuffer, appBuffer);
+ peerEngine.netBuffer.compact();
+ handshakeStatus = handshakeResult.getHandshakeStatus();
+ switch (handshakeResult.getStatus()) {
+ case OK: break;
+ case BUFFER_OVERFLOW:
+ appBuffer = ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize());
+ break;
+ case BUFFER_UNDERFLOW:
+ netBuffer = ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+ break;
+ case CLOSED:
+ default:
+ throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+ }
+ break;
+ case NEED_TASK:
+ sslEngine.getDelegatedTask().run();
+ handshakeStatus = sslEngine.getHandshakeStatus();
+ break;
+ case FINISHED:
+ return;
+ case NOT_HANDSHAKING:
+ if (handshakeResult.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.FINISHED) {
+ throw new SSLException("Did not finish handshake");
+ }
+ return;
+ default:
+ throw new IllegalStateException("Unexpected handshake status " + handshakeStatus);
+ }
+ }
+ }
+
+ boolean complete() {
+ return sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED
+ || sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
+ }
+
+ void close() {
+ sslEngine.closeOutbound();
+ try {
+ sslEngine.closeInbound();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ /**
+ * Check if the given ByteBuffer capacity.
+ * @param existingBuffer ByteBuffer capacity to check
+ * @param newLength new length for the ByteBuffer.
+ * returns ByteBuffer
+ */
+ public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength) {
+ if (newLength > existingBuffer.capacity()) {
+ ByteBuffer newBuffer = ByteBuffer.allocate(newLength);
+ existingBuffer.flip();
+ newBuffer.put(existingBuffer);
+ return newBuffer;
+ }
+ return existingBuffer;
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SslContextFactoryWithAutoRefresh.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SslContextFactoryWithAutoRefresh.java
new file mode 100644
index 0000000..e18e8c6
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SslContextFactoryWithAutoRefresh.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.keystoretls;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * SslContextFactoryWithAutoRefresh that create SSLContext for web server, and refresh in time.
+ */
+public class SslContextFactoryWithAutoRefresh extends SslContextFactory {
+ private final NetSslContextBuilder sslCtxRefresher;
+
+ public SslContextFactoryWithAutoRefresh(String sslProviderString,
+ String keyStoreTypeString,
+ String keyStore,
+ String keyStorePassword,
+ boolean allowInsecureConnection,
+ String trustStoreTypeString,
+ String trustStore,
+ String trustStorePassword,
+ boolean requireTrustedClientCertOnConnect,
+ long certRefreshInSec)
+ throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
+ super();
+ sslCtxRefresher = new NetSslContextBuilder(
+ sslProviderString,
+ keyStoreTypeString,
+ keyStore,
+ keyStorePassword,
+ allowInsecureConnection,
+ trustStoreTypeString,
+ trustStore,
+ trustStorePassword,
+ requireTrustedClientCertOnConnect,
+ certRefreshInSec);
+ }
+
+ @Override
+ public SSLContext getSslContext() {
+ return sslCtxRefresher.get();
+ }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/package-info.java
similarity index 53%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/package-info.java
index 03a9bd3..11a8db4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttps.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/package-info.java
@@ -16,33 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.authentication;
-
-import java.security.cert.X509Certificate;
-
-import javax.servlet.http.HttpServletRequest;
-
-public class AuthenticationDataHttps extends AuthenticationDataHttp {
-
- protected final X509Certificate[] certificates;
-
- public AuthenticationDataHttps(HttpServletRequest request) {
- super(request);
- certificates = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
- }
-
- /*
- * TLS
- */
-
- @Override
- public boolean hasDataFromTls() {
- return (certificates != null);
- }
-
- @Override
- public X509Certificate[] getTlsCertificates() {
- return certificates;
- }
-
-}
+/**
+ * Helpers to work with events from the non-blocking I/O client-server framework.
+ */
+package org.apache.pulsar.common.util.keystoretls;
diff --git a/pulsar-common/src/test/resources/broker.keystore.jks b/pulsar-common/src/test/resources/broker.keystore.jks
new file mode 100644
index 0000000..b4fec69
Binary files /dev/null and b/pulsar-common/src/test/resources/broker.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/broker.truststore.jks b/pulsar-common/src/test/resources/broker.truststore.jks
new file mode 100644
index 0000000..8ac03d8
Binary files /dev/null and b/pulsar-common/src/test/resources/broker.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/brokerKeyStorePW.txt b/pulsar-common/src/test/resources/brokerKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/brokerTrustStorePW.txt b/pulsar-common/src/test/resources/brokerTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/ca-cert b/pulsar-common/src/test/resources/ca-cert
new file mode 100644
index 0000000..32c8d92
--- /dev/null
+++ b/pulsar-common/src/test/resources/ca-cert
@@ -0,0 +1,16 @@
+-----BEGIN CERTIFICATE-----
+MIICmDCCAYACCQCYZHWHBQWWnTANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJj
+bjAgFw0yMDA0MjgxMzIxMDBaGA8yMTIwMDQwNDEzMjEwMFowDTELMAkGA1UEBhMC
+Y24wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC5CuNhU2OYdwlrd/04
+FOgXbzGLs/RoWToWq39Q4WSEVr7uaEBktFEyaoCcLYwsWm9ziF0ms5SLqNDptFiS
+c8Ftt1YJdTS+SfFRg+CWUyq8LmA9NsT/X6/Oy3Q/4398stzWdU1X6KXnWr4UIkDo
+wLDJEFCBkJCqK6w0mg92rzu8pPfIzQ8kvTy0ECK0DfRqe7+9YVPVCrR5ZItln/nu
+MnccQ9fQPL0pvTUXkWFAh/GupaUJkvA69SiAfzXJohtdCdrKrxV3m76kHHxoRlcf
+z1MRHq/r1DWBHLuV6c8p22TW4fAke0i/qwjEroiDRGX2MohCaHTJT3xxIbopznBF
+7GfJAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAJOz1oyP6TmRc2t7LcUzfEBFdKGZ
+PRyF7cin8o+c/IBl8svViTFk4dmdGAoeGpRQsn1i+J+AOKNBSRdyydgEFvE8qopR
+h7c9DzfSssf00eRAgdg8oCz2fOXDtLPwBTMe52q8MdJRe4OelRjNFs4VRpyVgZVQ
+13+GMgcj1E8taGqqSBqLccujWNJW1bsoesLzb6bYQWe8WrCPTQxB1NLIoYTZvXoK
+AkHShUEJV502UOxeRhjDjYeearlILIoV82sczXmlFNrhuiYxgIYa9tCywsagenRe
+6/WANyQP5nmCky0odbK0Uh7XweppFdb76FrooWVcd94HZaJBV7PnNdLoj/8=
+-----END CERTIFICATE-----
diff --git a/pulsar-common/src/test/resources/ca-cert.srl b/pulsar-common/src/test/resources/ca-cert.srl
new file mode 100644
index 0000000..aee9981
--- /dev/null
+++ b/pulsar-common/src/test/resources/ca-cert.srl
@@ -0,0 +1 @@
+A30DEADB8FD23BED
diff --git a/pulsar-common/src/test/resources/ca-key b/pulsar-common/src/test/resources/ca-key
new file mode 100644
index 0000000..485d220
--- /dev/null
+++ b/pulsar-common/src/test/resources/ca-key
@@ -0,0 +1,30 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQII+lV8LqZ9n4CAggA
+MB0GCWCGSAFlAwQBKgQQOGncQpnXEogdhApthe09awSCBNCErYMzOQEblSjp2HUq
+whuE1l7EUp4Et3cCtMenXoGlNfzMG0llnCmbIJA4j13X2IfGGpYRKNEbkeUUAuX5
+Er8nwWBZw3ux3iD4zYUl2Q69tcnC62eQaA5+Zj5T58i0ptxYXNTZ0p+q9ytMSP30
+9gb8KwJQdXiKZU5UxflLb9TrU7OWi3Ucnjbw4YYmRkwRZGZ6P+fUSC2FpixzI0J6
+73yBI36zfTvFJR5bslIFw2CSHbIFZeJ535oiLVqzOzjJHEFZ+OTeuN+Vh8Cktz3Y
+KAOdD30knci64vugxo5iLLWc/IXQRcuTBNHskPsym5ahYVSM9/+JqMw4OL2xyDbR
++YCdx3iER4PD9ErCXSCdMLsx4izuuULQGAyONtnj9awUwmzKMqEbtr6lLFChgyEc
+TiWpm5sLZPP1SiyjGKnhesiWRyB25b6iC/fSktf10Nrl+Fb1YJeLLSufG+ZVpy5h
+sN2cPkAnymRw9WyEaitkUqNI52GfClOYPJ0H0bU1LssUs2+d8HRHz6GWb3oW2IX8
+046PE6y0kIuUYEryTQ1lzmLdREIOG2yfkcL7ywN1WBwtqPZBV69peA8P3M3T9VEz
+nnTtEL/5VU0M3Bsm6GM8j9fmJDBmBG+6E0hUD6JCuDeBBvTBuknZIbiBf704DjyI
+qPDZhAkkVfD7dylm96GJLn38PrPk8sQPa3IR4zAXga7YHkXvh7HuYc0V9tZNn7mI
+/f3XoDV2wk279TAr8NLDgLGQHK0K1tDTirJXf09KxWj4zZOC914SeASjEhzQZ6se
+K3PbG4ZQnJ4+dAsY9K6Qgc9BxjyeInCdsDoDROtjqNfKSkcSelKhkp8VEJBYinWT
+PcBHt6/1iQvB1fyp+OexBjq9CiUDg/Be2QTUZfVqCkyIVptPnSvPyTg67pvWl4M6
+uRHxsufQ35WsZAhqEr5eQ7mPAvem7XCUJ14hPz8/f5Qm2+llPUVIq9Rpa83GP/TI
+P9W9F6tRj0qpW+QpPlxmyISf6oHiE9IOGmRkdFV0m54JiSQsqGqXr/NCH423LzFd
+21TJVKH02+v0Tgu1A1+HT+dEKQOorBZ3/HQ2NuQuYi+rF3adNEcMVmfy6DlnHkYr
+lDIG0exVC6Bveuzs4BCupKz/g7CbNzAEOO9XwcD/crNRnjE+nzdu/SDNtw5vIlKA
+hSSzEcsgVkqGbeaV5fASgb3pAz50xJHwvX6O4cAOvbcemjUGyd16IxIdI5jPRVvh
+u1BiK3YwSsdtg8sQ54YVbirgQ6SWKIXdN+79luksimbUVnEu8VJS1fu9H5ojefAd
+J9hMeiGht+6LKvyPh6Sa++bCfYRjZmbkX4h6Afc3Wwibh7KnfpAUlt4QzqA7o+x4
+7rCaI/w/uK+EFaqtn67TowAg/iq6Lxd7i9l06JBSC/BA+Hsw6tS86f13qPg2OtTK
+GydNfxnGtfZIMsUUtfldp9mB+afRFqX49joEGGmb2vnm4Q09QaGP5tagpJboIAqO
+c/pxtGWzqokR8sdAwX0oAb1vsPrpY3sbUGqSYJfVR6s4SMXJdbSSiIKzuwrcO+HX
+TSiq2yGGfJBl5bh9E8cnH4NifAJC4kXsBERwy+Ahq/64MRps3EW2tFl6nPuA+HMl
+IXg4wepHtC7w7W9nJ5Tw3b6X4g==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/pulsar-common/src/test/resources/cert-file b/pulsar-common/src/test/resources/cert-file
new file mode 100644
index 0000000..7d94877
--- /dev/null
+++ b/pulsar-common/src/test/resources/cert-file
@@ -0,0 +1,17 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIICpTCCAmECAQAwbzEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93
+bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMH
+VW5rbm93bjETMBEGA1UEAxMKY2xpZW50dXNlcjCCAbcwggEsBgcqhkjOOAQBMIIB
+HwKBgQD9f1OBHXUSKVLfSpwu7OTn9hG3UjzvRADDHj+AtlEmaUVdQCJR+1k9jVj6
+v8X1ujD2y5tVbNeBO4AdNG/yZmC3a5lQpaSfn+gEexAiwk+7qdf+t8Yb+DtX58ao
+phUPBPuD9tPFHsMCNVQTWhaRMvZ1864rYdcq7/IiAxmd0UgBxwIVAJdgUI8VIwvM
+spK5gqLrhAvwWBz1AoGBAPfhoIXWmz3ey7yrXDa4V7l5lK+7+jrqgvlXTAs9B4Jn
+UVlXjrrUWU/mcQcQgYC0SRZxI+hMKBYTt88JMozIpuE8FnqLVHyNKOCjrh4rs6Z1
+kW6jfwv6ITVi8ftiegEkO8yk8b6oUZCJqIPf4VrlnwaSi2ZegHtVJWQBTDv+z0kq
+A4GEAAKBgBmF8WdZ9Yv1Sf2qjqF19DUSY3YB67B0azz+689y8lZw0tlnSuej0bBE
+NIP6lvgC/PIPFdxvkInZOgB3TsWwkpxHzKbFZTo2Yg2txZ1IH4KX1QggePeybi2m
+E2soysZ2/r3nX2ZSOTdzDLicVo3yyKAuM8u14N0zBeJR9NMdOG1NoDAwLgYJKoZI
+hvcNAQkOMSEwHzAdBgNVHQ4EFgQUXx44DNZ7cUAoduGpv/MC+d5noyIwCwYHKoZI
+zjgEAwUAAzEAMC4CFQCQ2BDtunGs9G0Ra+16OHPaWAI6+QIVAIrGtZWtGka43D+3
+GqOEI5+wGsbh
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/pulsar-common/src/test/resources/cert-signed b/pulsar-common/src/test/resources/cert-signed
new file mode 100644
index 0000000..20db5c0
--- /dev/null
+++ b/pulsar-common/src/test/resources/cert-signed
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDjzCCAncCCQCjDerbj9I77TANBgkqhkiG9w0BAQUFADANMQswCQYDVQQGEwJj
+bjAgFw0yMDA0MjgxMzI4NDJaGA8yMTIwMDQwNDEzMjg0MlowbzEQMA4GA1UEBhMH
+VW5rbm93bjEQMA4GA1UECBMHVW5rbm93bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4G
+A1UEChMHVW5rbm93bjEQMA4GA1UECxMHVW5rbm93bjETMBEGA1UEAxMKY2xpZW50
+dXNlcjCCAbcwggEsBgcqhkjOOAQBMIIBHwKBgQD9f1OBHXUSKVLfSpwu7OTn9hG3
+UjzvRADDHj+AtlEmaUVdQCJR+1k9jVj6v8X1ujD2y5tVbNeBO4AdNG/yZmC3a5lQ
+paSfn+gEexAiwk+7qdf+t8Yb+DtX58aophUPBPuD9tPFHsMCNVQTWhaRMvZ1864r
+Ydcq7/IiAxmd0UgBxwIVAJdgUI8VIwvMspK5gqLrhAvwWBz1AoGBAPfhoIXWmz3e
+y7yrXDa4V7l5lK+7+jrqgvlXTAs9B4JnUVlXjrrUWU/mcQcQgYC0SRZxI+hMKBYT
+t88JMozIpuE8FnqLVHyNKOCjrh4rs6Z1kW6jfwv6ITVi8ftiegEkO8yk8b6oUZCJ
+qIPf4VrlnwaSi2ZegHtVJWQBTDv+z0kqA4GEAAKBgBmF8WdZ9Yv1Sf2qjqF19DUS
+Y3YB67B0azz+689y8lZw0tlnSuej0bBENIP6lvgC/PIPFdxvkInZOgB3TsWwkpxH
+zKbFZTo2Yg2txZ1IH4KX1QggePeybi2mE2soysZ2/r3nX2ZSOTdzDLicVo3yyKAu
+M8u14N0zBeJR9NMdOG1NMA0GCSqGSIb3DQEBBQUAA4IBAQAlf/MlmkGXvOHi68LU
+FoRoDh0UMUVUMYcpf+LicZkOveD0r5J5z6igDQZ5qT7RMfkSkM8pSl7xcuPNzNkT
+teeH29QOaTiYax+T9yAT9p/i2/DwiLbrcSdPT8UKOy5CVPHEtlreHupiezaID0Op
+IFaeuvBaI/HSbRZQ2IdCXTnXSQ+8rkrcoxDyIi9wjaEnWKwAqphgq0C9icNVMleu
+Lz3Wz51Xn03DQTH9uOtZu6kQYzfAEi7Z0hKF98TQ3BmwEwCRf+h5kE3wFbuT9QFh
+uHLeCvNlJoaajT2Qud0YWkIN+z1yVKzT3NdndmNm5SuM2Mzec3b4PHSLpmW0Cnwv
+UW4t
+-----END CERTIFICATE-----
diff --git a/pulsar-common/src/test/resources/client.keystore.jks b/pulsar-common/src/test/resources/client.keystore.jks
new file mode 100644
index 0000000..499c8be
Binary files /dev/null and b/pulsar-common/src/test/resources/client.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/client.truststore.jks b/pulsar-common/src/test/resources/client.truststore.jks
new file mode 100644
index 0000000..8eaa06b
Binary files /dev/null and b/pulsar-common/src/test/resources/client.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/clientKeyStorePW.txt b/pulsar-common/src/test/resources/clientKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/clientKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/clientTrustStorePW.txt b/pulsar-common/src/test/resources/clientTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-common/src/test/resources/clientTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-common/src/test/resources/old/broker.keystore.jks b/pulsar-common/src/test/resources/old/broker.keystore.jks
new file mode 100644
index 0000000..d4526ac
Binary files /dev/null and b/pulsar-common/src/test/resources/old/broker.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/old/broker.truststore.jks b/pulsar-common/src/test/resources/old/broker.truststore.jks
new file mode 100644
index 0000000..0c0e694
Binary files /dev/null and b/pulsar-common/src/test/resources/old/broker.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/old/brokerKeyStorePW.txt b/pulsar-common/src/test/resources/old/brokerKeyStorePW.txt
new file mode 100644
index 0000000..bdc331e
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+broker
diff --git a/pulsar-common/src/test/resources/old/brokerTrustStorePW.txt b/pulsar-common/src/test/resources/old/brokerTrustStorePW.txt
new file mode 100644
index 0000000..bdc331e
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+broker
diff --git a/pulsar-common/src/test/resources/old/client.keystore.jks b/pulsar-common/src/test/resources/old/client.keystore.jks
new file mode 100644
index 0000000..e5d074e
Binary files /dev/null and b/pulsar-common/src/test/resources/old/client.keystore.jks differ
diff --git a/pulsar-common/src/test/resources/old/client.truststore.jks b/pulsar-common/src/test/resources/old/client.truststore.jks
new file mode 100644
index 0000000..36f9d72
Binary files /dev/null and b/pulsar-common/src/test/resources/old/client.truststore.jks differ
diff --git a/pulsar-common/src/test/resources/old/clientKeyStorePW.txt b/pulsar-common/src/test/resources/old/clientKeyStorePW.txt
new file mode 100644
index 0000000..b051c6c
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/clientKeyStorePW.txt
@@ -0,0 +1 @@
+client
diff --git a/pulsar-common/src/test/resources/old/clientTrustStorePW.txt b/pulsar-common/src/test/resources/old/clientTrustStorePW.txt
new file mode 100644
index 0000000..b051c6c
--- /dev/null
+++ b/pulsar-common/src/test/resources/old/clientTrustStorePW.txt
@@ -0,0 +1 @@
+client
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
index 6052f2b..250259b 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.discovery.service;
+import io.netty.handler.ssl.SslHandler;
import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.NettySslContextBuilder;
+import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import io.netty.channel.ChannelInitializer;
@@ -36,19 +39,38 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
public static final String TLS_HANDLER = "tls";
private final DiscoveryService discoveryService;
private final boolean enableTls;
- private final NettySslContextBuilder sslCtxRefresher;
+ private final boolean tlsEnabledWithKeyStore;
+ private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
+ private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig, boolean e)
throws Exception {
super();
this.discoveryService = discoveryService;
this.enableTls = e;
+ this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
if (this.enableTls) {
- sslCtxRefresher = new NettySslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
- serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
- serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
- serviceConfig.getTlsRequireTrustedClientCertOnConnect(),
- serviceConfig.getTlsCertRefreshCheckDurationSec());
+ if (tlsEnabledWithKeyStore) {
+ nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+ serviceConfig.getTlsProvider(),
+ serviceConfig.getTlsKeyStoreType(),
+ serviceConfig.getTlsKeyStore(),
+ serviceConfig.getTlsKeyStorePassword(),
+ serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getTlsTrustStoreType(),
+ serviceConfig.getTlsTrustStore(),
+ serviceConfig.getTlsTrustStorePassword(),
+ serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+ serviceConfig.getTlsCiphers(),
+ serviceConfig.getTlsProtocols(),
+ serviceConfig.getTlsCertRefreshCheckDurationSec());
+ } else {
+ sslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
+ serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+ serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+ serviceConfig.getTlsCertRefreshCheckDurationSec());
+ }
} else {
this.sslCtxRefresher = null;
}
@@ -57,9 +79,14 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (sslCtxRefresher != null && this.enableTls) {
- SslContext sslContext = sslCtxRefresher.get();
- if (sslContext != null) {
- ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
+ if (this.tlsEnabledWithKeyStore) {
+ ch.pipeline().addLast(TLS_HANDLER,
+ new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
+ } else{
+ SslContext sslContext = sslCtxRefresher.get();
+ if (sslContext != null) {
+ ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
+ }
}
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
index 945074c..bafc4ac 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
@@ -19,17 +19,16 @@
package org.apache.pulsar.discovery.service.server;
import com.google.common.collect.Lists;
-
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
-
import javax.servlet.Servlet;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.discovery.service.web.RestException;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -74,14 +73,30 @@ public class ServerManager {
if (config.getWebServicePortTls().isPresent()) {
try {
- SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
- config.isTlsAllowInsecureConnection(),
- config.getTlsTrustCertsFilePath(),
- config.getTlsCertificateFilePath(),
- config.getTlsKeyFilePath(),
- config.getTlsRequireTrustedClientCertOnConnect(),
- true,
- config.getTlsCertRefreshCheckDurationSec());
+ SslContextFactory sslCtxFactory;
+ if (config.isTlsEnabledWithKeyStore()) {
+ sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+ config.getTlsProvider(),
+ config.getTlsKeyStoreType(),
+ config.getTlsKeyStore(),
+ config.getTlsKeyStorePassword(),
+ config.isTlsAllowInsecureConnection(),
+ config.getTlsTrustStoreType(),
+ config.getTlsTrustStore(),
+ config.getTlsTrustStorePassword(),
+ config.isTlsRequireTrustedClientCertOnConnect(),
+ config.getTlsCertRefreshCheckDurationSec()
+ );
+ } else {
+ sslCtxFactory = SecurityUtility.createSslContextFactory(
+ config.isTlsAllowInsecureConnection(),
+ config.getTlsTrustCertsFilePath(),
+ config.getTlsCertificateFilePath(),
+ config.getTlsKeyFilePath(),
+ config.isTlsRequireTrustedClientCertOnConnect(),
+ true,
+ config.getTlsCertRefreshCheckDurationSec());
+ }
connectorTls = new ServerConnector(server, 1, 1, sslCtxFactory);
connectorTls.setPort(config.getWebServicePortTls().get());
connectors.add(connectorTls);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index fa750ff..d8c85df 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import lombok.Data;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.discovery.service.web.DiscoveryServiceServlet;
@@ -32,6 +33,7 @@ import com.google.common.collect.Sets;
* Service Configuration to start :{@link DiscoveryServiceServlet}
*
*/
+@Data
public class ServiceConfig implements PulsarConfiguration {
// Local-Zookeeper quorum connection string
@@ -78,7 +80,7 @@ public class ServiceConfig implements PulsarConfiguration {
/***** --- TLS --- ****/
@Deprecated
private boolean tlsEnabled = false;
- // Tls cert refresh duration in seconds (set 0 to check on every new connection)
+ // Tls cert refresh duration in seconds (set 0 to check on every new connection)
private long tlsCertRefreshCheckDurationSec = 300;
// Path for the TLS certificate file
private String tlsCertificateFilePath;
@@ -98,209 +100,27 @@ public class ServiceConfig implements PulsarConfiguration {
// Reject the Connection if the Client Certificate is not trusted.
private boolean tlsRequireTrustedClientCertOnConnect = false;
- private Properties properties = new Properties();
-
- public String getZookeeperServers() {
- return zookeeperServers;
- }
+ /***** --- TLS with KeyStore--- ****/
+ // Enable TLS with KeyStore type configuration in broker
+ private boolean tlsEnabledWithKeyStore = false;
+ // TLS Provider
+ private String tlsProvider = null;
+ // TLS KeyStore type configuration in broker: JKS, PKCS12
+ private String tlsKeyStoreType = "JKS";
+ // TLS KeyStore path in broker
+ private String tlsKeyStore = null;
+ // TLS KeyStore password in broker
+ private String tlsKeyStorePassword = null;
+ // TLS TrustStore type configuration in broker: JKS, PKCS12
+ private String tlsTrustStoreType = "JKS";
+ // TLS TrustStore path in broker
+ private String tlsTrustStore = null;
+ // TLS TrustStore password in broker"
+ private String tlsTrustStorePassword = null;
- public void setZookeeperServers(String zookeeperServers) {
- this.zookeeperServers = zookeeperServers;
- }
-
- @Deprecated
- public String getGlobalZookeeperServers() {
- return globalZookeeperServers;
- }
-
- @Deprecated
- public void setGlobalZookeeperServers(String globalZookeeperServers) {
- this.globalZookeeperServers = globalZookeeperServers;
- }
+ private Properties properties = new Properties();
public String getConfigurationStoreServers() {
return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers;
}
-
- public void setConfigurationStoreServers(String configurationStoreServers) {
- this.configurationStoreServers = configurationStoreServers;
- }
-
- public int getZookeeperSessionTimeoutMs() {
- return zookeeperSessionTimeoutMs;
- }
-
- public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
- this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
- }
-
- public Optional<Integer> getServicePort() {
- return servicePort;
- }
-
- public void setServicePort(Optional<Integer> servicePort) {
- this.servicePort = servicePort;
- }
-
- public Optional<Integer> getServicePortTls() {
- return servicePortTls;
- }
-
- public void setServicePortTls(Optional<Integer> servicePortTls) {
- this.servicePortTls = servicePortTls;
- }
-
- public Optional<Integer> getWebServicePort() {
- return webServicePort;
- }
-
- public void setWebServicePort(Optional<Integer> webServicePort) {
- this.webServicePort = webServicePort;
- }
-
- public Optional<Integer> getWebServicePortTls() {
- return webServicePortTls;
- }
-
- public void setWebServicePortTls(Optional<Integer> webServicePortTls) {
- this.webServicePortTls = webServicePortTls;
- }
-
- @Deprecated
- public boolean isTlsEnabled() {
- return tlsEnabled || webServicePortTls != null || servicePortTls != null;
- }
-
- @Deprecated
- public void setTlsEnabled(boolean tlsEnabled) {
- this.tlsEnabled = tlsEnabled;
- }
-
- public String getTlsCertificateFilePath() {
- return tlsCertificateFilePath;
- }
-
- public void setTlsCertificateFilePath(String tlsCertificateFilePath) {
- this.tlsCertificateFilePath = tlsCertificateFilePath;
- }
-
- public String getTlsKeyFilePath() {
- return tlsKeyFilePath;
- }
-
- public void setTlsKeyFilePath(String tlsKeyFilePath) {
- this.tlsKeyFilePath = tlsKeyFilePath;
- }
-
- public String getTlsTrustCertsFilePath() {
- return tlsTrustCertsFilePath;
- }
-
- public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
- this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;
- }
-
- public boolean isTlsAllowInsecureConnection() {
- return tlsAllowInsecureConnection;
- }
-
- public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
- this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
- }
-
- public boolean isBindOnLocalhost() {
- return bindOnLocalhost;
- }
-
- public void setBindOnLocalhost(boolean bindOnLocalhost) {
- this.bindOnLocalhost = bindOnLocalhost;
- }
-
- public boolean isAuthenticationEnabled() {
- return authenticationEnabled;
- }
-
- public void setAuthenticationEnabled(boolean authenticationEnabled) {
- this.authenticationEnabled = authenticationEnabled;
- }
-
- public Set<String> getAuthenticationProviders() {
- return authenticationProviders;
- }
-
- public void setAuthenticationProviders(Set<String> authenticationProviders) {
- this.authenticationProviders = authenticationProviders;
- }
-
- public boolean isAuthorizationEnabled() {
- return authorizationEnabled;
- }
-
- public void setAuthorizationEnabled(boolean authorizationEnabled) {
- this.authorizationEnabled = authorizationEnabled;
- }
-
- public String getAuthorizationProvider() {
- return authorizationProvider;
- }
-
- public void setAuthorizationProvider(String authorizationProvider) {
- this.authorizationProvider = authorizationProvider;
- }
-
- public Set<String> getSuperUserRoles() {
- return superUserRoles;
- }
-
- public void setSuperUserRoles(Set<String> superUserRoles) {
- this.superUserRoles = superUserRoles;
- }
-
- public boolean getAuthorizationAllowWildcardsMatching() {
- return authorizationAllowWildcardsMatching;
- }
-
- public void setAuthorizationAllowWildcardsMatching(boolean authorizationAllowWildcardsMatching) {
- this.authorizationAllowWildcardsMatching = authorizationAllowWildcardsMatching;
- }
-
- public Properties getProperties() {
- return properties;
- }
-
- public void setProperties(Properties properties) {
- this.properties = properties;
- }
-
- public Set<String> getTlsProtocols() {
- return tlsProtocols;
- }
-
- public void setTlsProtocols(Set<String> tlsProtocols) {
- this.tlsProtocols = tlsProtocols;
- }
-
- public long getTlsCertRefreshCheckDurationSec() {
- return tlsCertRefreshCheckDurationSec;
- }
-
- public void setTlsCertRefreshCheckDurationSec(long tlsCertRefreshCheckDurationSec) {
- this.tlsCertRefreshCheckDurationSec = tlsCertRefreshCheckDurationSec;
- }
-
- public Set<String> getTlsCiphers() {
- return tlsCiphers;
- }
-
- public void setTlsCiphers(Set<String> tlsCiphers) {
- this.tlsCiphers = tlsCiphers;
- }
-
- public boolean getTlsRequireTrustedClientCertOnConnect() {
- return tlsRequireTrustedClientCertOnConnect;
- }
-
- public void setTlsRequireTrustedClientCertOnConnect(boolean tlsRequireTrustedClientCertOnConnect) {
- this.tlsRequireTrustedClientCertOnConnect = tlsRequireTrustedClientCertOnConnect;
- }
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index c2d9eca..10e96b8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -33,11 +33,13 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
+import java.util.function.Supplier;
+import lombok.Getter;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
@@ -70,18 +72,18 @@ public class DirectProxyHandler {
public static final String TLS_HANDLER = "tls";
private final Authentication authentication;
- private final SslContext sslCtx;
+ private final Supplier<SslHandler> sslHandlerSupplier;
private AuthenticationDataProvider authenticationDataProvider;
public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
- int protocolVersion, SslContext sslCtx) {
+ int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
this.originalPrincipal = proxyConnection.clientAuthRole;
this.clientAuthData = proxyConnection.clientAuthData;
this.clientAuthMethod = proxyConnection.clientAuthMethod;
this.protocolVersion = protocolVersion;
- this.sslCtx = sslCtx;
+ this.sslHandlerSupplier = sslHandlerSupplier;
ProxyConfiguration config = service.getConfiguration();
// Start the connection attempt.
@@ -94,8 +96,8 @@ public class DirectProxyHandler {
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
- if (sslCtx != null) {
- ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+ if (sslHandlerSupplier != null) {
+ ch.pipeline().addLast(TLS_HANDLER, sslHandlerSupplier.get());
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 6a293a0..34beaa4 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -60,6 +60,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
@Category
private static final String CATEGORY_TLS = "TLS";
@Category
+ private static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS";
+ @Category
private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider";
@Category
private static final String CATEGORY_HTTP = "HTTP";
@@ -314,7 +316,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
private Set<String> tlsProtocols = Sets.newTreeSet();
@FieldContext(
category = CATEGORY_TLS,
- doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake"
+ doc = "Specify the tls cipher the proxy will use to negotiate during TLS Handshake"
+ " (a comma-separated list of ciphers).\n\n"
+ "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
)
@@ -326,6 +328,106 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private boolean tlsRequireTrustedClientCertOnConnect = false;
+ /**** --- KeyStore TLS config variables --- ****/
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Enable TLS with KeyStore type configuration for proxy"
+ )
+ private boolean tlsEnabledWithKeyStore = false;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS Provider"
+ )
+ private String tlsProvider = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS KeyStore type configuration for proxy: JKS, PKCS12"
+ )
+ private String tlsKeyStoreType = "JKS";
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS KeyStore path for proxy"
+ )
+ private String tlsKeyStore = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS KeyStore password for proxy"
+ )
+ private String tlsKeyStorePassword = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore type configuration for proxy: JKS, PKCS12"
+ )
+ private String tlsTrustStoreType = "JKS";
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore path for proxy"
+ )
+ private String tlsTrustStore = null;
+
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore password for proxy"
+ )
+ private String tlsTrustStorePassword = null;
+
+ /**** --- KeyStore TLS config variables used for proxy to auth with broker--- ****/
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Whether the Pulsar proxy use KeyStore type to authenticate with Pulsar brokers"
+ )
+ private boolean brokerClientTlsEnabledWithKeyStore = false;
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "The TLS Provider used by the Pulsar proxy to authenticate with Pulsar brokers"
+ )
+ private String brokerClientSslProvider = null;
+
+ // needed when client auth is required
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore type configuration for proxy: JKS, PKCS12 "
+ + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+ )
+ private String brokerClientTlsTrustStoreType = "JKS";
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore path for proxy, "
+ + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+ )
+ private String brokerClientTlsTrustStore = null;
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "TLS TrustStore password for proxy, "
+ + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+ )
+ private String brokerClientTlsTrustStorePassword = null;
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Specify the tls cipher the proxy will use to negotiate during TLS Handshake"
+ + " (a comma-separated list of ciphers).\n\n"
+ + "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].\n"
+ + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+ )
+ private Set<String> brokerClientTlsCiphers = Sets.newTreeSet();
+ @FieldContext(
+ category = CATEGORY_KEYSTORE_TLS,
+ doc = "Specify the tls protocols the broker will use to negotiate during TLS handshake"
+ + " (a comma-separated list of protocol names).\n\n"
+ + "Examples:- [TLSv1.2, TLSv1.1, TLSv1] \n"
+ + " used by the Pulsar proxy to authenticate with Pulsar brokers"
+ )
+ private Set<String> brokerClientTlsProtocols = Sets.newTreeSet();
+
+ /***** --- HTTP --- ****/
+
@FieldContext(
category = CATEGORY_HTTP,
doc = "Http directs to redirect to non-pulsar services"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 9b8b7fb..929b919 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
@@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@@ -71,7 +71,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
- private final SslContext sslCtx;
+ private final Supplier<SslHandler> sslHandlerSupplier;
private LookupProxyHandler lookupProxyHandler = null;
private DirectProxyHandler directProxyHandler = null;
@@ -110,11 +110,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
return client.getCnxPool();
}
- public ProxyConnection(ProxyService proxyService, SslContext sslCtx) {
+ public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
this.state = State.Init;
- this.sslCtx = sslCtx;
+ this.sslHandlerSupplier = sslHandlerSupplier;
}
@Override
@@ -209,7 +209,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
// connection there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
- protocolVersionToAdvertise, sslCtx);
+ protocolVersionToAdvertise, sslHandlerSupplier);
cancelKeepAliveTask();
} else {
// Client is doing a lookup, we can consider the handshake complete
@@ -408,8 +408,15 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
if (proxyConfig.isTlsEnabledWithBroker()) {
clientConf.setUseTls(true);
- clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
- clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+ if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
+ clientConf.setUseKeyStoreTls(true);
+ clientConf.setTlsTrustStoreType(proxyConfig.getBrokerClientTlsTrustStoreType());
+ clientConf.setTlsTrustStorePath(proxyConfig.getBrokerClientTlsTrustStore());
+ clientConf.setTlsTrustStorePassword(proxyConfig.getBrokerClientTlsTrustStorePassword());
+ } else {
+ clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
+ clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+ }
}
return clientConf;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 156492a..42a5c07 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -20,16 +20,20 @@ package org.apache.pulsar.proxy.server;
import static org.apache.commons.lang3.StringUtils.isEmpty;
+import io.netty.handler.ssl.SslHandler;
+import java.util.function.Supplier;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.ClientSslContextRefresher;
-import org.apache.pulsar.common.util.NettySslContextBuilder;
+import org.apache.pulsar.common.util.NettyClientSslContextRefresher;
+import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
/**
* Initialize service channel handlers.
@@ -39,22 +43,43 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
public static final String TLS_HANDLER = "tls";
private final ProxyService proxyService;
- private final NettySslContextBuilder serverSslCtxRefresher;
- private final ClientSslContextRefresher clientSslCtxRefresher;
private final boolean enableTls;
+ private final boolean tlsEnabledWithKeyStore;
+
+ private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
+ private SslContextAutoRefreshBuilder<SslContext> clientSslCtxRefresher;
+ private NettySSLContextAutoRefreshBuilder serverSSLContextAutoRefreshBuilder;
+ private NettySSLContextAutoRefreshBuilder clientSSLContextAutoRefreshBuilder;
public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration serviceConfig, boolean enableTls)
throws Exception {
super();
this.proxyService = proxyService;
this.enableTls = enableTls;
+ this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
if (enableTls) {
- serverSslCtxRefresher = new NettySslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
- serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
- serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
- serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
- serviceConfig.getTlsCertRefreshCheckDurationSec());
+ if (tlsEnabledWithKeyStore) {
+ serverSSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+ serviceConfig.getTlsProvider(),
+ serviceConfig.getTlsKeyStoreType(),
+ serviceConfig.getTlsKeyStore(),
+ serviceConfig.getTlsKeyStorePassword(),
+ serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getTlsTrustStoreType(),
+ serviceConfig.getTlsTrustStore(),
+ serviceConfig.getTlsTrustStorePassword(),
+ serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+ serviceConfig.getTlsCiphers(),
+ serviceConfig.getTlsProtocols(),
+ serviceConfig.getTlsCertRefreshCheckDurationSec());
+ } else {
+ serverSslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
+ serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+ serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+ serviceConfig.getTlsCertRefreshCheckDurationSec());
+ }
} else {
this.serverSslCtxRefresher = null;
}
@@ -67,9 +92,24 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
serviceConfig.getBrokerClientAuthenticationParameters()).getAuthData();
}
- clientSslCtxRefresher = new ClientSslContextRefresher(serviceConfig.isTlsAllowInsecureConnection(),
- serviceConfig.getBrokerClientTrustCertsFilePath(), authData);
-
+ if (tlsEnabledWithKeyStore) {
+ clientSSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+ serviceConfig.getBrokerClientSslProvider(),
+ serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getBrokerClientTlsTrustStoreType(),
+ serviceConfig.getBrokerClientTlsTrustStore(),
+ serviceConfig.getBrokerClientTlsTrustStorePassword(),
+ serviceConfig.getBrokerClientTlsCiphers(),
+ serviceConfig.getBrokerClientTlsProtocols(),
+ serviceConfig.getTlsCertRefreshCheckDurationSec(),
+ authData);
+ } else {
+ clientSslCtxRefresher = new NettyClientSslContextRefresher(
+ serviceConfig.isTlsAllowInsecureConnection(),
+ serviceConfig.getBrokerClientTrustCertsFilePath(),
+ authData,
+ serviceConfig.getTlsCertRefreshCheckDurationSec());
+ }
} else {
this.clientSslCtxRefresher = null;
}
@@ -82,11 +122,33 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
if (sslContext != null) {
ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
}
+ } else if (this.tlsEnabledWithKeyStore && serverSSLContextAutoRefreshBuilder != null) {
+ ch.pipeline().addLast(TLS_HANDLER,
+ new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
- Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+ Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+
+ Supplier<SslHandler> sslHandlerSupplier = null;
+ if (clientSslCtxRefresher != null) {
+ sslHandlerSupplier = new Supplier<SslHandler>() {
+ @Override
+ public SslHandler get() {
+ return clientSslCtxRefresher.get().newHandler(ch.alloc());
+ }
+ };
+ } else if (clientSSLContextAutoRefreshBuilder != null) {
+ sslHandlerSupplier = new Supplier<SslHandler>() {
+ @Override
+ public SslHandler get() {
+ return new SslHandler(clientSSLContextAutoRefreshBuilder.get().createSSLEngine());
+ }
+ };
+ }
+
ch.pipeline().addLast("handler",
- new ProxyConnection(proxyService, clientSslCtxRefresher == null ? null : clientSslCtxRefresher.get()));
+ new ProxyConnection(proxyService, sslHandlerSupplier));
+
}
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 0b8dca2..3a45399 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -19,9 +19,7 @@
package org.apache.pulsar.proxy.server;
import com.google.common.collect.Lists;
-
import io.prometheus.client.jetty.JettyStatisticsCollector;
-
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -31,15 +29,14 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
-
import javax.servlet.DispatcherType;
-
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.broker.web.JsonMapperProvider;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
@@ -99,14 +96,30 @@ public class WebServer {
}
if (config.getWebServicePortTls().isPresent()) {
try {
- SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
- config.isTlsAllowInsecureConnection(),
- config.getTlsTrustCertsFilePath(),
- config.getTlsCertificateFilePath(),
- config.getTlsKeyFilePath(),
- config.isTlsRequireTrustedClientCertOnConnect(),
- true,
- config.getTlsCertRefreshCheckDurationSec());
+ SslContextFactory sslCtxFactory;
+ if (config.isTlsEnabledWithKeyStore()) {
+ sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+ config.getTlsProvider(),
+ config.getTlsKeyStoreType(),
+ config.getTlsKeyStore(),
+ config.getTlsKeyStorePassword(),
+ config.isTlsAllowInsecureConnection(),
+ config.getTlsTrustStoreType(),
+ config.getTlsTrustStore(),
+ config.getTlsTrustStorePassword(),
+ config.isTlsRequireTrustedClientCertOnConnect(),
+ config.getTlsCertRefreshCheckDurationSec()
+ );
+ } else {
+ sslCtxFactory = SecurityUtility.createSslContextFactory(
+ config.isTlsAllowInsecureConnection(),
+ config.getTlsTrustCertsFilePath(),
+ config.getTlsCertificateFilePath(),
+ config.getTlsKeyFilePath(),
+ config.isTlsRequireTrustedClientCertOnConnect(),
+ true,
+ config.getTlsCertRefreshCheckDurationSec());
+ }
connectorTls = new ServerConnector(server, 1, 1, sslCtxFactory);
connectorTls.setPort(config.getWebServicePortTls().get());
connectors.add(connectorTls);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
new file mode 100644
index 0000000..51354ed
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.doReturn;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
+ protected final String BROKER_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+ protected final String BROKER_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+ protected final String BROKER_KEYSTORE_PW = "111111";
+ protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+ protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+ protected final String CLIENT_KEYSTORE_PW = "111111";
+ protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_CN = "clientuser";
+ protected final String KEYSTORE_TYPE = "JKS";
+
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+ private ProxyService proxyService;
+ private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ internalSetup();
+
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setServicePortTls(Optional.of(0));
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setWebServicePortTls(Optional.of(0));
+ proxyConfig.setTlsEnabledWithBroker(false);
+
+ proxyConfig.setTlsEnabledWithKeyStore(true);
+ proxyConfig.setTlsKeyStoreType(KEYSTORE_TYPE);
+ proxyConfig.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+ proxyConfig.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+ proxyConfig.setTlsTrustStoreType(KEYSTORE_TYPE);
+ proxyConfig.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+ proxyConfig.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+
+
+ // config for authentication and authorization.
+ proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
+ proxyConfig.setSuperUserRoles(Sets.newHashSet(CLIENT_KEYSTORE_CN));
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setAuthorizationEnabled(true);
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderTls.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+
+ proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+ proxyService.start();
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ internalCleanup();
+
+ proxyService.close();
+ }
+
+ protected PulsarClient internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .serviceUrl(lookupUrl)
+ .enableTls(true)
+ .useKeyStoreTls(true)
+ .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+ .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+ .allowTlsInsecureConnection(false)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS);
+ if (addCertificates) {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+ clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+ }
+ return clientBuilder.build();
+ }
+
+ @Test
+ public void testProducer() throws Exception {
+ @Cleanup
+ PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test".getBytes());
+ }
+ }
+
+ @Test
+ public void testProducerFailed() throws Exception {
+ @Cleanup
+ PulsarClient client = internalSetUpForClient(false, proxyService.getServiceUrlTls());
+ try {
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+ .create();
+ Assert.fail("Should failed since broker setTlsRequireTrustedClientCertOnConnect, "
+ + "while client not set keystore");
+ } catch (Exception e) {
+ // expected
+ log.info("Expected failed since broker setTlsRequireTrustedClientCertOnConnect,"
+ + " while client not set keystore");
+ }
+ }
+
+ @Test
+ public void testPartitions() throws Exception {
+ @Cleanup
+ PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+ String topicName = "persistent://sample/test/local/partitioned-topic" + System.currentTimeMillis();
+ admin.tenants().createTenant("sample", new TenantInfo());
+ admin.topics().createPartitionedTopic(topicName, 2);
+
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic(topicName)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+
+ // Create a consumer directly attached to broker
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-sub").subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test".getBytes());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+ checkNotNull(msg);
+ }
+ }
+
+
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
new file mode 100644
index 0000000..e6ce925
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.doReturn;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest {
+ protected final String BROKER_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.keystore.jks";
+ protected final String BROKER_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/broker.truststore.jks";
+ protected final String BROKER_KEYSTORE_PW = "111111";
+ protected final String BROKER_TRUSTSTORE_PW = "111111";
+
+ protected final String CLIENT_KEYSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.keystore.jks";
+ protected final String CLIENT_TRUSTSTORE_FILE_PATH =
+ "./src/test/resources/authentication/keystoretls/client.truststore.jks";
+ protected final String CLIENT_KEYSTORE_PW = "111111";
+ protected final String CLIENT_TRUSTSTORE_PW = "111111";
+
+ protected final String KEYSTORE_TYPE = "JKS";
+
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+ private ProxyService proxyService;
+ private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ internalSetup();
+
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setServicePortTls(Optional.of(0));
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setWebServicePortTls(Optional.of(0));
+ proxyConfig.setTlsEnabledWithBroker(false);
+
+ proxyConfig.setTlsEnabledWithKeyStore(true);
+ proxyConfig.setTlsKeyStoreType(KEYSTORE_TYPE);
+ proxyConfig.setTlsKeyStore(BROKER_KEYSTORE_FILE_PATH);
+ proxyConfig.setTlsKeyStorePassword(BROKER_KEYSTORE_PW);
+ proxyConfig.setTlsTrustStoreType(KEYSTORE_TYPE);
+ proxyConfig.setTlsTrustStore(CLIENT_TRUSTSTORE_FILE_PATH);
+ proxyConfig.setTlsTrustStorePassword(CLIENT_TRUSTSTORE_PW);
+ proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
+
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+
+ proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+ proxyService.start();
+ }
+
+ protected PulsarClient internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .serviceUrl(lookupUrl)
+ .enableTls(true)
+ .useKeyStoreTls(true)
+ .tlsTrustStorePath(BROKER_TRUSTSTORE_FILE_PATH)
+ .tlsTrustStorePassword(BROKER_TRUSTSTORE_PW)
+ .allowTlsInsecureConnection(false)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS);
+ if (addCertificates) {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH);
+ authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW);
+ clientBuilder.authentication(AuthenticationKeyStoreTls.class.getName(), authParams);
+ }
+ return clientBuilder.build();
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ proxyService.close();
+ }
+
+ @Test
+ public void testProducer() throws Exception {
+ @Cleanup
+ PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test".getBytes());
+ }
+ }
+
+ @Test
+ public void testProducerFailed() throws Exception {
+ @Cleanup
+ PulsarClient client = internalSetUpForClient(false, proxyService.getServiceUrlTls());
+ try {
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic("persistent://sample/test/local/topic" + System.currentTimeMillis())
+ .create();
+ Assert.fail("Should failed since broker setTlsRequireTrustedClientCertOnConnect, "
+ + "while client not set keystore");
+ } catch (Exception e) {
+ // expected
+ log.info("Expected failed since broker setTlsRequireTrustedClientCertOnConnect,"
+ + " while client not set keystore");
+ }
+ }
+
+ @Test
+ public void testPartitions() throws Exception {
+ @Cleanup
+ PulsarClient client = internalSetUpForClient(true, proxyService.getServiceUrlTls());
+ String topicName = "persistent://sample/test/local/partitioned-topic" + System.currentTimeMillis();
+ admin.tenants().createTenant("sample", new TenantInfo());
+ admin.topics().createPartitionedTopic(topicName, 2);
+
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic(topicName)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+
+ // Create a consumer directly attached to broker
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-sub").subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test".getBytes());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+ checkNotNull(msg);
+ }
+ }
+
+}
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.keystore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.keystore.jks
new file mode 100644
index 0000000..b4fec69
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.keystore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.truststore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.truststore.jks
new file mode 100644
index 0000000..8ac03d8
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/broker.truststore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/brokerTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/client.keystore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.keystore.jks
new file mode 100644
index 0000000..499c8be
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.keystore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/client.truststore.jks b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.truststore.jks
new file mode 100644
index 0000000..8eaa06b
Binary files /dev/null and b/pulsar-proxy/src/test/resources/authentication/keystoretls/client.truststore.jks differ
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientKeyStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/pulsar-proxy/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
new file mode 100644
index 0000000..90d2950
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/keystoretls/clientTrustStorePW.txt
@@ -0,0 +1 @@
+111111
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index f08656f..642a7c0 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -146,6 +146,18 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|tlsAllowInsecureConnection| Accept untrusted TLS certificate from client |false|
|tlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
|tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- ```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tlsEnabledWithKeyStore| Enable TLS with KeyStore type configuration in broker |false|
+|tlsProvider| TLS Provider for KeyStore type ||
+|tlsKeyStoreType| LS KeyStore type configuration in broker: JKS, PKCS12 |JKS|
+|tlsKeyStore| TLS KeyStore path in broker ||
+|tlsKeyStorePassword| TLS KeyStore password for broker ||
+|brokerClientTlsEnabledWithKeyStore| Whether internal client use KeyStore type to authenticate with Pulsar brokers |false|
+|brokerClientSslProvider| The TLS Provider used by internal client to authenticate with other Pulsar brokers ||
+|brokerClientTlsTrustStoreType| TLS TrustStore type configuration for internal client: JKS, PKCS12, used by the internal client to authenticate with Pulsar brokers |JKS|
+|brokerClientTlsTrustStore| TLS TrustStore path for internal client, used by the internal client to authenticate with Pulsar brokers ||
+|brokerClientTlsTrustStorePassword| TLS TrustStore password for internal client, used by the internal client to authenticate with Pulsar brokers ||
+|brokerClientTlsCiphers| Specify the tls cipher the internal client will use to negotiate during TLS Handshake. (a comma-separated list of ciphers) e.g. [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]||
+|brokerClientTlsProtocols|Specify the tls protocols the broker will use to negotiate during TLS handshake. (a comma-separated list of protocol names). e.g. [TLSv1.2, TLSv1.1, TLSv1] ||
|tokenSecretKey| Configure the secret key to be used to validate auth tokens. The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or `tokenSecretKey=file:///my/secret.key`||
|tokenPublicKey| Configure the public key to be used to validate auth tokens. The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or `tokenPublicKey=file:///my/secret.key`||
|tokenPublicAlg| Configure the algorithm to be used to validate auth tokens. This can be any of the asymettric algorithms supported by Java JWT (https://github.com/jwtk/jjwt#signature-algorithms-keys) |RS256|