You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/26 22:54:41 UTC
(pulsar) 07/11: [fix][broker] Fix PulsarService.getLookupServiceAddress returns wrong port if TLS is enabled (#21015)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8e2eb50d9c93748e4426fcad1bfad28f69bb7faf
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Sep 17 20:44:13 2023 -0500
[fix][broker] Fix PulsarService.getLookupServiceAddress returns wrong port if TLS is enabled (#21015)
(cherry picked from commit 1363777918547b0f56bdbbf04cec28d050a43586)
---
.../org/apache/pulsar/broker/PulsarService.java | 10 +++---
.../apache/pulsar/broker/PulsarServiceTest.java | 2 ++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 14 +++-----
.../apache/pulsar/broker/admin/AdminApiTest.java | 2 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 4 ---
.../loadbalance/AdvertisedListenersTest.java | 2 --
.../broker/loadbalance/LoadBalancerTest.java | 1 -
.../loadbalance/SimpleLoadManagerImplTest.java | 1 -
.../impl/ModularLoadManagerImplTest.java | 6 ----
.../OwnerShipForCurrentServerTestBase.java | 2 --
.../pulsar/broker/service/BrokerServiceTest.java | 9 +++++
.../broker/service/ClusterMigrationTest.java | 9 +++++
.../pulsar/broker/service/ReplicatorTest.java | 2 +-
.../broker/transaction/TransactionTestBase.java | 2 --
.../pulsar/client/api/BrokerServiceLookupTest.java | 7 +++-
.../client/api/ClientAuthenticationTlsTest.java | 4 ++-
.../pulsar/proxy/server/ProxyRefreshAuthTest.java | 1 +
.../proxy/server/ProxyServiceTlsStarterTest.java | 1 +
.../integration/containers/BrokerContainer.java | 9 +++--
.../integration/containers/ProxyContainer.java | 9 +++--
.../tests/integration/tls/ClientTlsTest.java | 9 +++++
.../integration/topologies/PulsarCluster.java | 40 +++++++++++++++-------
.../integration/topologies/PulsarClusterSpec.java | 6 ++++
23 files changed, 98 insertions(+), 54 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index edcc83df2b8..f1f630316a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1734,18 +1734,18 @@ public class PulsarService implements AutoCloseable, ShutdownService {
}
public String getSafeWebServiceAddress() {
- return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
+ return webServiceAddressTls != null ? webServiceAddressTls : webServiceAddress;
}
@Deprecated
public String getSafeBrokerServiceUrl() {
- return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
+ return brokerServiceUrlTls != null ? brokerServiceUrlTls : brokerServiceUrl;
}
public String getLookupServiceAddress() {
- return String.format("%s:%s", advertisedAddress, config.getWebServicePort().isPresent()
- ? config.getWebServicePort().get()
- : config.getWebServicePortTls().get());
+ return String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
+ ? config.getWebServicePortTls().get()
+ : config.getWebServicePort().orElseThrow());
}
public TopicPoliciesService getTopicPoliciesService() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 37a7310ae17..3e0887646e1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -54,6 +54,8 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
+ conf.setBrokerServicePortTls(Optional.of(0));
+ conf.setWebServicePortTls(Optional.of(0));
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index ba1a4f36b17..0f94a8813c0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -439,19 +439,13 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
String tenantName = newUniqueName("prop-xyz2");
admin.tenants().createTenant(tenantName, tenantInfo);
admin.namespaces().createNamespace(tenantName + "/ns1", Set.of("test"));
- conf.setBrokerServicePort(Optional.of(1024));
- conf.setBrokerServicePortTls(Optional.of(1025));
- conf.setWebServicePort(Optional.of(1026));
- conf.setWebServicePortTls(Optional.of(1027));
+ ServiceConfiguration config2 = super.getDefaultConf();
@Cleanup
- PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf);
+ PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(config2);
PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
- conf.setBrokerServicePort(Optional.of(2048));
- conf.setBrokerServicePortTls(Optional.of(2049));
- conf.setWebServicePort(Optional.of(2050));
- conf.setWebServicePortTls(Optional.of(2051));
+ ServiceConfiguration config3 = super.getDefaultConf();
@Cleanup
- PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(conf);
+ PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(config3);
PulsarService pulsar3 = pulsarTestContext.getPulsarService();
@Cleanup
PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5cf95dbdc8c..b72f1b697f7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -930,7 +930,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getPublishers().size(), 0);
assertEquals(topicStats.getOwnerBroker(),
- pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get());
+ pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePortTls().get());
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false);
assertEquals(internalStats.cursors.keySet(), Set.of(Codec.encode(subName)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 16db0f48480..62d2b2aafa7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -230,10 +230,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
this.conf.setBrokerShutdownTimeoutMs(0L);
this.conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
this.conf.setBrokerServicePort(Optional.of(0));
- this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.of(0));
- this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
this.conf.setExposeBundlesMetricsInPrometheus(true);
}
@@ -473,9 +471,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
configuration.setBrokerShutdownTimeoutMs(0L);
configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
configuration.setBrokerServicePort(Optional.of(0));
- configuration.setBrokerServicePortTls(Optional.of(0));
configuration.setWebServicePort(Optional.of(0));
- configuration.setWebServicePortTls(Optional.of(0));
configuration.setBookkeeperClientExposeStatsToPrometheus(true);
configuration.setNumExecutorThreadPoolSize(5);
configuration.setBrokerMaxConnections(0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index 7a8154312e4..a88ccd60ae4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -78,7 +78,6 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
",public_https:https://localhost:" + httpsPort);
conf.setBrokerServicePort(Optional.of(pulsarPort));
conf.setWebServicePort(Optional.of(httpPort));
- conf.setWebServicePortTls(Optional.of(httpsPort));
}
@Test
@@ -101,7 +100,6 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
- assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost");
// Produce data
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 68902c73e57..7cc4499df97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -124,7 +124,6 @@ public class LoadBalancerTest {
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
- config.setWebServicePortTls(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index c4898786e3e..6303c70b4dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -113,7 +113,6 @@ public class SimpleLoadManagerImplTest {
config1.setBrokerServicePort(Optional.of(0));
config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config1.setBrokerServicePortTls(Optional.of(0));
- config1.setWebServicePortTls(Optional.of(0));
config1.setAdvertisedAddress("localhost");
pulsar1 = new PulsarService(config1);
pulsar1.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 786c9027c94..d8acb6d24e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -169,8 +169,6 @@ public class ModularLoadManagerImplTest {
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
- config1.setBrokerServicePortTls(Optional.of(0));
- config1.setWebServicePortTls(Optional.of(0));
pulsar1 = new PulsarService(config1);
pulsar1.start();
@@ -189,8 +187,6 @@ public class ModularLoadManagerImplTest {
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
- config2.setBrokerServicePortTls(Optional.of(0));
- config2.setWebServicePortTls(Optional.of(0));
pulsar2 = new PulsarService(config2);
pulsar2.start();
@@ -204,8 +200,6 @@ public class ModularLoadManagerImplTest {
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
- config.setBrokerServicePortTls(Optional.of(0));
- config.setWebServicePortTls(Optional.of(0));
pulsar3 = new PulsarService(config);
secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index 8dd4f53db82..46e8989ac3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -80,10 +80,8 @@ public abstract class OwnerShipForCurrentServerTestBase {
conf.setBrokerShutdownTimeoutMs(0L);
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
- conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
conf.setWebServicePort(Optional.of(0));
- conf.setWebServicePortTls(Optional.of(0));
serviceConfigurationList.add(conf);
PulsarTestContext.Builder testContextBuilder =
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 10c90fd434d..f07a1b99c3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1646,4 +1646,13 @@ public class BrokerServiceTest extends BrokerTestBase {
fail("Unsubscribe failed");
}
}
+
+ @Test
+ public void testGetLookupServiceAddress() throws Exception {
+ cleanup();
+ setup();
+ conf.setWebServicePortTls(Optional.of(8081));
+ assertEquals(pulsar.getLookupServiceAddress(), "localhost:8081");
+ resetState();
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index cfa9a5f9b04..b36bf33aff4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.net.URL;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -477,6 +478,14 @@ public class ClusterMigrationTest {
super.setupWithClusterName(clusterName);
}
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setWebServicePortTls(Optional.of(0));
+ this.conf.setBrokerServicePortTls(Optional.of(0));
+ }
+
+
public PulsarService getPulsarService() {
return pulsar;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d0fd3365f96..3d955250608 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -248,7 +248,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
-> admin2.clusters().getCluster(cluster2) != null);
List<String> list = admin1.brokers().getActiveBrokers(cluster2);
- assertEquals(list.get(0), url2.toString().replace("http://", ""));
+ assertEquals(list.get(0), urlTls2.toString().replace("https://", ""));
//restore configuration
pulsar1.getConfiguration().setAuthorizationEnabled(false);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index cd0c089ad41..3a83d2f95fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -156,10 +156,8 @@ public abstract class TransactionTestBase extends TestRetrySupport {
conf.setBrokerShutdownTimeoutMs(0L);
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
- conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
conf.setWebServicePort(Optional.of(0));
- conf.setWebServicePortTls(Optional.of(0));
conf.setTransactionCoordinatorEnabled(true);
conf.setBrokerDeduplicationEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 3ab5e926054..a632608bf70 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -485,7 +485,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// request [3]
doReturn(true).when(loadManager1).isCentralized();
doReturn(true).when(loadManager2).isCentralized();
- SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddressTls(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
@@ -518,6 +518,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
loadManager1 = null;
loadManager2 = null;
+
+ conf.setBrokerServicePortTls(Optional.empty());
+ conf.setWebServicePortTls(Optional.empty());
}
/**
@@ -949,6 +952,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
admin.topics().createPartitionedTopic(dest.toString(), totalPartitions);
stopBroker();
+ conf.setBrokerServicePortTls(Optional.empty());
+ conf.setWebServicePortTls(Optional.empty());
conf.setClientLibraryVersionCheckEnabled(true);
startBroker();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
index c9b243257c4..d716d5a8063 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -50,7 +51,8 @@ public class ClientAuthenticationTlsTest extends ProducerConsumerBase {
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
-
+ conf.setWebServicePortTls(Optional.of(0));
+ conf.setBrokerServicePortTls(Optional.of(0));
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index bde989fc432..2f36cc679f1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -69,6 +69,7 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase {
conf.setAdvertisedAddress(null);
conf.setAuthenticateOriginalAuthData(true);
conf.setBrokerServicePort(Optional.of(0));
+ conf.setWebServicePortTls(Optional.of(0));
conf.setWebServicePort(Optional.of(0));
Set<String> superUserRoles = new HashSet<>();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 01c06fbf52f..6247c2a66e8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -75,6 +75,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
protected void doInitConf() throws Exception {
super.doInitConf();
+ this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
index 616d45554d7..a51397050b9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
@@ -28,8 +28,13 @@ public class BrokerContainer extends PulsarContainer<BrokerContainer> {
public static final String NAME = "pulsar-broker";
public BrokerContainer(String clusterName, String hostName) {
- super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, BROKER_PORT_TLS,
- BROKER_HTTP_PORT, BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
+ this(clusterName, hostName, false);
+ }
+
+ public BrokerContainer(String clusterName, String hostName, boolean enableTls) {
+ super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT,
+ enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
+ enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
tailContainerLog();
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
index 53283447378..f3926878f37 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
@@ -28,8 +28,13 @@ public class ProxyContainer extends PulsarContainer<ProxyContainer> {
public static final String NAME = "pulsar-proxy";
public ProxyContainer(String clusterName, String hostName) {
- super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT, BROKER_PORT_TLS, BROKER_HTTP_PORT,
- BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
+ this(clusterName, hostName, false);
+ }
+
+ public ProxyContainer(String clusterName, String hostName, boolean enableTls) {
+ super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT,
+ enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
+ enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java
index 59ff978cafa..080912cd492 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -41,6 +42,14 @@ public class ClientTlsTest extends PulsarTestSuite {
return Resources.getResource("certificate-authority/" + name).getPath();
}
+ @Override
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ specBuilder.enableTls(true);
+ return specBuilder;
+ }
+
@DataProvider(name = "adminUrls")
public Object[][] adminUrls() {
return new Object[][]{
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index ca5fe4b3852..ce308e4a386 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -38,6 +38,7 @@ import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.tests.integration.containers.BKContainer;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
@@ -132,14 +133,16 @@ public class PulsarCluster {
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();
- this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME)
+ this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName("pulsar-proxy"))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
- .withEnv("clusterName", clusterName)
+ .withEnv("clusterName", clusterName);
// enable mTLS
+ if (spec.enableTls) {
+ proxyContainer
.withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
.withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
.withEnv("forwardAuthorizationCredentials", "true")
@@ -147,7 +150,15 @@ public class PulsarCluster {
.withEnv("tlsAllowInsecureConnection", "false")
.withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
.withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
- .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem");
+ .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
+ .withEnv("brokerClientAuthenticationPlugin", AuthenticationTls.class.getName())
+ .withEnv("brokerClientAuthenticationParameters", String.format("tlsCertFile:%s,tlsKeyFile:%s", "/pulsar/certificate-authority/client-keys/admin.cert.pem", "/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
+ .withEnv("tlsEnabledWithBroker", "true")
+ .withEnv("brokerClientTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
+ .withEnv("brokerClientCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
+ .withEnv("brokerClientKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
+
+ }
if (spec.proxyEnvs != null) {
spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
}
@@ -184,7 +195,7 @@ public class PulsarCluster {
// create brokers
brokerContainers.putAll(
runNumContainers("broker", spec.numBrokers(), (name) -> {
- BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name))
+ BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name), spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName(name))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
@@ -195,16 +206,19 @@ public class PulsarCluster {
.withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey").withEnv("AWS_SECRET_KEY", "secretkey")
- .withEnv("maxMessageSize", "" + spec.maxMessageSize)
+ .withEnv("maxMessageSize", "" + spec.maxMessageSize);
+ if (spec.enableTls) {
// enable mTLS
- .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
- .withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS))
- .withEnv("authenticateOriginalAuthData", "true")
- .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
- .withEnv("tlsAllowInsecureConnection", "false")
- .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")
- .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem")
- .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem");
+ brokerContainer
+ .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
+ .withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS))
+ .withEnv("authenticateOriginalAuthData", "true")
+ .withEnv("tlsAllowInsecureConnection", "false")
+ .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
+ .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
+ .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")
+ .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
+ }
if (spec.queryLastMessage) {
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index fa28d20e6b3..c141e990d62 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -177,4 +177,10 @@ public class PulsarClusterSpec {
* Additional ports to expose on bookie containers.
*/
List<Integer> bookieAdditionalPorts;
+
+ /**
+ * Enable TLS for connection.
+ */
+ @Default
+ boolean enableTls = false;
}