You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/09/24 20:38:20 UTC
[pulsar] branch master updated: Always use SNI for TLS enabled
Pulsar Java client. (#8117)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f2933f7 Always use SNI for TLS enabled Pulsar Java client. (#8117)
f2933f7 is described below
commit f2933f7da4850814d92fac0e54c5314c51c8fc32
Author: Rolf Arne Corneliussen <ra...@users.noreply.github.com>
AuthorDate: Thu Sep 24 22:37:51 2020 +0200
Always use SNI for TLS enabled Pulsar Java client. (#8117)
Co-authored-by: Rolf Arne Corneliussen <ro...@addsecure.com>
---
.../org/apache/pulsar/client/api/TlsSniTest.java | 66 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConnectionPool.java | 56 +++++++-----------
.../client/impl/PulsarChannelInitializer.java | 58 +++++++++----------
.../util/keystoretls/KeyStoreSSLContext.java | 10 +++-
4 files changed, 124 insertions(+), 66 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
new file mode 100644
index 0000000..fc8c242
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.testng.annotations.Test;
+
+import lombok.Cleanup;
+
+public class TlsSniTest extends TlsProducerConsumerBase {
+
+ /**
+ * Verify that using an IP-address in the broker service URL will work with using the SNI capabilities
+ * of the client. If we try to create an {@link javax.net.ssl.SSLEngine} with a peer host that is an
+ * IP address, the peer host is ignored, see for example
+ * {@link io.netty.handler.ssl.ReferenceCountedOpenSslEngine}.
+ *
+ */
+ @Test
+ public void testIpAddressInBrokerServiceUrl() throws Exception {
+ String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+ URI brokerServiceUrlTls = new URI(pulsar.getBrokerServiceUrlTls());
+
+ String brokerServiceIpAddressUrl = String.format("pulsar+ssl://%s:%d",
+ InetAddress.getByName(brokerServiceUrlTls.getHost()).getHostAddress(),
+ brokerServiceUrlTls.getPort());
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl)
+ .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false)
+ .enableTlsHostnameVerification(false)
+ .operationTimeout(1000, TimeUnit.MILLISECONDS);
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);
+
+ @Cleanup
+ PulsarClient pulsarClient = clientBuilder.build();
+ // should be able to create producer successfully
+ pulsarClient.newProducer().topic(topicName).create();
+ }
+}
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 32d7195..6203990 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -90,7 +90,7 @@ public class ConnectionPool implements Closeable {
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
try {
- channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy);
+ channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
bootstrap.handler(channelInitializerHandler);
} catch (Exception e) {
log.error("Failed to create channel initializer");
@@ -293,39 +293,12 @@ public class ConnectionPool implements Closeable {
* Attempt to establish a TCP connection to an already resolved single IP address
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
- CompletableFuture<Channel> future = new CompletableFuture<>();
- // if proxy is configured in pulsar-client then make it thread-safe while updating channelInitializerHandler
- if (isSniProxy) {
- bootstrap.register().addListener((ChannelFuture cf) -> {
- if (!cf.isSuccess()) {
- future.completeExceptionally(cf.cause());
- return;
- }
- Channel channel = cf.channel();
- try {
- channelInitializerHandler.initChannel(channel, sniHost);
- channel.connect(new InetSocketAddress(ipAddress, port)).addListener((ChannelFuture channelFuture) -> {
- if (channelFuture.isSuccess()) {
- future.complete(channelFuture.channel());
- } else {
- future.completeExceptionally(channelFuture.cause());
- }
- });
- } catch (Exception e) {
- log.warn("Failed to initialize channel with {}, {}", ipAddress, sniHost, e);
- future.completeExceptionally(e);
- }
- });
- } else {
- bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
- if (channelFuture.isSuccess()) {
- future.complete(channelFuture.channel());
- } else {
- future.completeExceptionally(channelFuture.cause());
- }
- });
- }
- return future;
+ InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
+ return adapt(bootstrap.register())
+ .thenCompose(channel -> clientConfig.isUseTls()
+ ? channelInitializerHandler.initTls(channel, sniHost != null ? sniHost : remoteAddress)
+ : CompletableFuture.completedFuture(channel))
+ .thenCompose(channel -> adapt(channel.connect(remoteAddress)));
}
public void releaseConnection(ClientCnx cnx) {
@@ -364,7 +337,7 @@ public class ConnectionPool implements Closeable {
}
public static int signSafeMod(long dividend, int divisor) {
- int mod = (int) (dividend % (long) divisor);
+ int mod = (int) (dividend % divisor);
if (mod < 0) {
mod += divisor;
}
@@ -372,4 +345,17 @@ public class ConnectionPool implements Closeable {
}
private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
+
+ private static CompletableFuture<Channel> adapt(ChannelFuture channelFuture) {
+ CompletableFuture<Channel> adapter = new CompletableFuture<>();
+ channelFuture.addListener((ChannelFuture cf) ->{
+ if (cf.isSuccess()) {
+ adapter.complete(channelFuture.channel());
+ } else {
+ adapter.completeExceptionally(channelFuture.cause());
+ }
+ });
+ return adapter;
+ }
}
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 043151c..f50bed5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.client.impl;
import java.net.InetSocketAddress;
-import java.security.cert.X509Certificate;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -52,17 +52,15 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
private final Supplier<SslContext> sslContextSupplier;
private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
- private final boolean isSniProxyEnabled;
private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1);
- public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier, boolean isSniProxyEnabled)
+ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier)
throws Exception {
super();
this.clientCnxSupplier = clientCnxSupplier;
this.tlsEnabled = conf.isUseTls();
this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls();
- this.isSniProxyEnabled = isSniProxyEnabled;
if (tlsEnabled) {
if (tlsEnabledWithKeyStore) {
@@ -88,10 +86,10 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
return authData.getTlsTrustStoreStream() == null
? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath(),
- (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey())
+ authData.getTlsCertificates(), authData.getTlsPrivateKey())
: SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
authData.getTlsTrustStoreStream(),
- (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
+ authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath());
@@ -107,33 +105,35 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
@Override
public void initChannel(SocketChannel ch) throws Exception {
- /**
- * skip initializing channel if sni-proxy is enabled in that case {@link ConnectionPool} will initialize the
- * channel explicitly.
- */
- if (!isSniProxyEnabled) {
- initChannel(ch, null);
- }
- }
- public void initChannel(Channel ch, InetSocketAddress sniHost) throws Exception {
- if (tlsEnabled) {
- if (tlsEnabledWithKeyStore) {
- ch.pipeline().addLast(TLS_HANDLER,
- new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
- } else {
- SslHandler handler = sniHost != null
- ? sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostName(), sniHost.getPort())
- : sslContextSupplier.get().newHandler(ch.alloc());
- ch.pipeline().addLast(TLS_HANDLER, handler);
- }
- ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
- } else {
- ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
- }
+ // Setup channel except for the SsHandler for TLS enabled connections
+
+ ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
+
+ CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) {
+ if (!tlsEnabled) {
+ throw new IllegalStateException("TLS is not enabled in client configuration");
+ }
+ CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
+ ch.eventLoop().execute(() -> {
+ try {
+ SslHandler handler = tlsEnabledWithKeyStore
+ ? new SslHandler(nettySSLContextAutoRefreshBuilder.get()
+ .createSSLEngine(sniHost.getHostString(), sniHost.getPort()))
+ : sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostString(), sniHost.getPort());
+ ch.pipeline().addFirst(TLS_HANDLER, handler);
+ initTlsFuture.complete(ch);
+ } catch (Throwable t) {
+ initTlsFuture.completeExceptionally(t);
+ }
+ });
+
+ return initTlsFuture;
+ }
}
+
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
index b9ad2e7..736d189 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
@@ -166,8 +166,14 @@ public class KeyStoreSSLContext {
}
public SSLEngine createSSLEngine() {
- SSLEngine sslEngine = sslContext.createSSLEngine();
+ return configureSSLEngine(sslContext.createSSLEngine());
+ }
+
+ public SSLEngine createSSLEngine(String peerHost, int peerPort) {
+ return configureSSLEngine(sslContext.createSSLEngine(peerHost, peerPort));
+ }
+ private SSLEngine configureSSLEngine(SSLEngine sslEngine) {
sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
@@ -177,7 +183,6 @@ public class KeyStoreSSLContext {
} else {
sslEngine.setUseClientMode(true);
}
-
return sslEngine;
}
@@ -353,3 +358,4 @@ public class KeyStoreSSLContext {
return sslCtxFactory;
}
}
+