You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/26 20:51:44 UTC

(pulsar) 02/06: [fix][broker] Fix PulsarService.getLookupServiceAddress returns wrong port if TLS is enabled (#21015)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 628e79ded04b8aeb0e146e717e8ed5442c566a6b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Sep 17 20:44:13 2023 -0500

    [fix][broker]  Fix PulsarService.getLookupServiceAddress returns wrong port if TLS is enabled (#21015)
    
    (cherry picked from commit 1363777918547b0f56bdbbf04cec28d050a43586)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 10 +++---
 .../apache/pulsar/broker/PulsarServiceTest.java    |  2 ++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 14 +++-----
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  2 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  4 ---
 .../loadbalance/AdvertisedListenersTest.java       |  2 --
 .../broker/loadbalance/LoadBalancerTest.java       |  1 -
 .../loadbalance/SimpleLoadManagerImplTest.java     |  1 -
 .../impl/ModularLoadManagerImplTest.java           |  6 ----
 .../OwnerShipForCurrentServerTestBase.java         |  2 --
 .../pulsar/broker/service/BrokerServiceTest.java   |  9 +++++
 .../broker/service/ClusterMigrationTest.java       |  9 +++++
 .../pulsar/broker/service/ReplicatorTest.java      |  2 +-
 .../broker/transaction/TransactionTestBase.java    |  2 --
 .../pulsar/client/api/BrokerServiceLookupTest.java |  7 +++-
 .../client/api/ClientAuthenticationTlsTest.java    |  4 ++-
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  |  1 +
 .../proxy/server/ProxyServiceTlsStarterTest.java   |  1 +
 .../integration/containers/BrokerContainer.java    |  9 +++--
 .../integration/containers/ProxyContainer.java     |  9 +++--
 .../tests/integration/tls/ClientTlsTest.java       |  9 +++++
 .../integration/topologies/PulsarCluster.java      | 40 +++++++++++++++-------
 .../integration/topologies/PulsarClusterSpec.java  |  6 ++++
 23 files changed, 98 insertions(+), 54 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a366cf25aa0..50696bda37c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1733,18 +1733,18 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
     public String getSafeWebServiceAddress() {
-        return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
+        return webServiceAddressTls != null ? webServiceAddressTls : webServiceAddress;
     }
 
     @Deprecated
     public String getSafeBrokerServiceUrl() {
-        return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
+        return brokerServiceUrlTls != null ? brokerServiceUrlTls : brokerServiceUrl;
     }
 
     public String getLookupServiceAddress() {
-        return String.format("%s:%s", advertisedAddress, config.getWebServicePort().isPresent()
-                ? config.getWebServicePort().get()
-                : config.getWebServicePortTls().orElseThrow());
+        return String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
+                ? config.getWebServicePortTls().get()
+                : config.getWebServicePort().orElseThrow());
     }
 
     public TopicPoliciesService getTopicPoliciesService() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 37a7310ae17..3e0887646e1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -54,6 +54,8 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest {
     @Override
     protected void doInitConf() throws Exception {
         super.doInitConf();
+        conf.setBrokerServicePortTls(Optional.of(0));
+        conf.setWebServicePortTls(Optional.of(0));
         if (useStaticPorts) {
             conf.setBrokerServicePortTls(Optional.of(6651));
             conf.setBrokerServicePort(Optional.of(6660));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 00134f8c918..723d576dea3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -439,19 +439,13 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         String tenantName = newUniqueName("prop-xyz2");
         admin.tenants().createTenant(tenantName, tenantInfo);
         admin.namespaces().createNamespace(tenantName + "/ns1", Set.of("test"));
-        conf.setBrokerServicePort(Optional.of(1024));
-        conf.setBrokerServicePortTls(Optional.of(1025));
-        conf.setWebServicePort(Optional.of(1026));
-        conf.setWebServicePortTls(Optional.of(1027));
+        ServiceConfiguration config2 = super.getDefaultConf();
         @Cleanup
-        PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf);
+        PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(config2);
         PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
-        conf.setBrokerServicePort(Optional.of(2048));
-        conf.setBrokerServicePortTls(Optional.of(2049));
-        conf.setWebServicePort(Optional.of(2050));
-        conf.setWebServicePortTls(Optional.of(2051));
+        ServiceConfiguration config3 = super.getDefaultConf();
         @Cleanup
-        PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(conf);
+        PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(config3);
         PulsarService pulsar3 = pulsarTestContext.getPulsarService();
         @Cleanup
         PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 9fb3cb6fb23..e97707710d7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -931,7 +931,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
         assertEquals(topicStats.getPublishers().size(), 0);
         assertEquals(topicStats.getOwnerBroker(),
-                pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get());
+                pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePortTls().get());
 
         PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false);
         assertEquals(internalStats.cursors.keySet(), Set.of(Codec.encode(subName)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 941a9356a86..cb51b8aee35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -233,10 +233,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         this.conf.setBrokerShutdownTimeoutMs(0L);
         this.conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         this.conf.setBrokerServicePort(Optional.of(0));
-        this.conf.setBrokerServicePortTls(Optional.of(0));
         this.conf.setAdvertisedAddress("localhost");
         this.conf.setWebServicePort(Optional.of(0));
-        this.conf.setWebServicePortTls(Optional.of(0));
         this.conf.setNumExecutorThreadPoolSize(5);
         this.conf.setExposeBundlesMetricsInPrometheus(true);
     }
@@ -492,9 +490,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         configuration.setBrokerShutdownTimeoutMs(0L);
         configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         configuration.setBrokerServicePort(Optional.of(0));
-        configuration.setBrokerServicePortTls(Optional.of(0));
         configuration.setWebServicePort(Optional.of(0));
-        configuration.setWebServicePortTls(Optional.of(0));
         configuration.setBookkeeperClientExposeStatsToPrometheus(true);
         configuration.setNumExecutorThreadPoolSize(5);
         configuration.setBrokerMaxConnections(0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index 7a8154312e4..a88ccd60ae4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -78,7 +78,6 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
                         ",public_https:https://localhost:" + httpsPort);
         conf.setBrokerServicePort(Optional.of(pulsarPort));
         conf.setWebServicePort(Optional.of(httpPort));
-        conf.setWebServicePortTls(Optional.of(httpsPort));
     }
 
     @Test
@@ -101,7 +100,6 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
 
         assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
         assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
-        assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost");
 
 
         // Produce data
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 68902c73e57..7cc4499df97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -124,7 +124,6 @@ public class LoadBalancerTest {
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setBrokerServicePortTls(Optional.of(0));
-            config.setWebServicePortTls(Optional.of(0));
             config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setBrokerShutdownTimeoutMs(0L);
             config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index c4898786e3e..6303c70b4dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -113,7 +113,6 @@ public class SimpleLoadManagerImplTest {
         config1.setBrokerServicePort(Optional.of(0));
         config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
         config1.setBrokerServicePortTls(Optional.of(0));
-        config1.setWebServicePortTls(Optional.of(0));
         config1.setAdvertisedAddress("localhost");
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index e3206ba5167..ceeb5704fb2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -172,8 +172,6 @@ public class ModularLoadManagerImplTest {
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config1.setBrokerServicePort(Optional.of(0));
-        config1.setBrokerServicePortTls(Optional.of(0));
-        config1.setWebServicePortTls(Optional.of(0));
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
 
@@ -192,8 +190,6 @@ public class ModularLoadManagerImplTest {
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config2.setBrokerServicePort(Optional.of(0));
-        config2.setBrokerServicePortTls(Optional.of(0));
-        config2.setWebServicePortTls(Optional.of(0));
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
 
@@ -207,8 +203,6 @@ public class ModularLoadManagerImplTest {
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
-        config.setBrokerServicePortTls(Optional.of(0));
-        config.setWebServicePortTls(Optional.of(0));
         pulsar3 = new PulsarService(config);
 
         secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index 8dd4f53db82..46e8989ac3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -80,10 +80,8 @@ public abstract class OwnerShipForCurrentServerTestBase {
             conf.setBrokerShutdownTimeoutMs(0L);
             conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
             conf.setBrokerServicePort(Optional.of(0));
-            conf.setBrokerServicePortTls(Optional.of(0));
             conf.setAdvertisedAddress("localhost");
             conf.setWebServicePort(Optional.of(0));
-            conf.setWebServicePortTls(Optional.of(0));
             serviceConfigurationList.add(conf);
 
             PulsarTestContext.Builder testContextBuilder =
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index e0f2fadc2b6..43ea15a0391 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1786,4 +1786,13 @@ public class BrokerServiceTest extends BrokerTestBase {
             fail("Unsubscribe failed");
         }
     }
+
+    @Test
+    public void testGetLookupServiceAddress() throws Exception {
+        cleanup();
+        setup();
+        conf.setWebServicePortTls(Optional.of(8081));
+        assertEquals(pulsar.getLookupServiceAddress(), "localhost:8081");
+        resetState();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index c4f2cd047af..a2dcf3c9c0b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.net.URL;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -477,6 +478,14 @@ public class ClusterMigrationTest {
             super.setupWithClusterName(clusterName);
         }
 
+        @Override
+        protected void doInitConf() throws Exception {
+            super.doInitConf();
+            this.conf.setWebServicePortTls(Optional.of(0));
+            this.conf.setBrokerServicePortTls(Optional.of(0));
+        }
+
+
         public PulsarService getPulsarService() {
             return pulsar;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 88a6f7c9f69..c372e3029ab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -248,7 +248,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
                 -> admin2.clusters().getCluster(cluster2) != null);
 
         List<String> list = admin1.brokers().getActiveBrokers(cluster2);
-        assertEquals(list.get(0), url2.toString().replace("http://", ""));
+        assertEquals(list.get(0), urlTls2.toString().replace("https://", ""));
         //restore configuration
         pulsar1.getConfiguration().setAuthorizationEnabled(false);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index c0300c63b35..1ff835732aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -157,10 +157,8 @@ public abstract class TransactionTestBase extends TestRetrySupport {
             conf.setBrokerShutdownTimeoutMs(0L);
             conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
             conf.setBrokerServicePort(Optional.of(0));
-            conf.setBrokerServicePortTls(Optional.of(0));
             conf.setAdvertisedAddress("localhost");
             conf.setWebServicePort(Optional.of(0));
-            conf.setWebServicePortTls(Optional.of(0));
             conf.setTransactionCoordinatorEnabled(true);
             conf.setBrokerDeduplicationEnabled(true);
             conf.setTransactionBufferSnapshotMaxTransactionCount(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index c0cc06795ac..6becc9cb578 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -479,7 +479,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // request [3]
         doReturn(true).when(loadManager1).isCentralized();
         doReturn(true).when(loadManager2).isCentralized();
-        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddressTls(), null);
         doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
 
@@ -512,6 +512,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         loadManager1 = null;
         loadManager2 = null;
+
+        conf.setBrokerServicePortTls(Optional.empty());
+        conf.setWebServicePortTls(Optional.empty());
     }
 
     /**
@@ -943,6 +946,8 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         admin.topics().createPartitionedTopic(dest.toString(), totalPartitions);
 
         stopBroker();
+        conf.setBrokerServicePortTls(Optional.empty());
+        conf.setWebServicePortTls(Optional.empty());
         conf.setClientLibraryVersionCheckEnabled(true);
         startBroker();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
index c9b243257c4..d716d5a8063 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -50,7 +51,8 @@ public class ClientAuthenticationTlsTest extends ProducerConsumerBase {
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
-
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setBrokerServicePortTls(Optional.of(0));
         conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
         conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
         conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index bde989fc432..2f36cc679f1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -69,6 +69,7 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase {
         conf.setAdvertisedAddress(null);
         conf.setAuthenticateOriginalAuthData(true);
         conf.setBrokerServicePort(Optional.of(0));
+        conf.setWebServicePortTls(Optional.of(0));
         conf.setWebServicePort(Optional.of(0));
 
         Set<String> superUserRoles = new HashSet<>();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 01c06fbf52f..6247c2a66e8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -75,6 +75,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
 
     protected void doInitConf() throws Exception {
         super.doInitConf();
+        this.conf.setBrokerServicePortTls(Optional.of(0));
         this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
         this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
index 616d45554d7..a51397050b9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
@@ -28,8 +28,13 @@ public class BrokerContainer extends PulsarContainer<BrokerContainer> {
     public static final String NAME = "pulsar-broker";
 
     public BrokerContainer(String clusterName, String hostName) {
-        super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, BROKER_PORT_TLS,
-                BROKER_HTTP_PORT, BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
+        this(clusterName, hostName, false);
+    }
+
+    public BrokerContainer(String clusterName, String hostName, boolean enableTls) {
+        super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT,
+                enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
+                enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
         tailContainerLog();
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
index 53283447378..f3926878f37 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
@@ -28,8 +28,13 @@ public class ProxyContainer extends PulsarContainer<ProxyContainer> {
     public static final String NAME = "pulsar-proxy";
 
     public ProxyContainer(String clusterName, String hostName) {
-        super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT, BROKER_PORT_TLS, BROKER_HTTP_PORT,
-                BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
+        this(clusterName, hostName, false);
+    }
+
+    public ProxyContainer(String clusterName, String hostName, boolean enableTls) {
+        super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT,
+                enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
+                enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java
index 59ff978cafa..080912cd492 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -41,6 +42,14 @@ public class ClientTlsTest extends PulsarTestSuite {
         return Resources.getResource("certificate-authority/" + name).getPath();
     }
 
+    @Override
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+            String clusterName,
+            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        specBuilder.enableTls(true);
+        return specBuilder;
+    }
+
     @DataProvider(name = "adminUrls")
     public Object[][] adminUrls() {
         return new Object[][]{
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 9b4823f46d4..769f135599b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -38,6 +38,7 @@ import java.util.function.Function;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.tests.integration.containers.BKContainer;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.CSContainer;
@@ -132,14 +133,16 @@ public class PulsarCluster {
         this.brokerContainers = Maps.newTreeMap();
         this.workerContainers = Maps.newTreeMap();
 
-        this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME)
+        this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls)
                 .withNetwork(network)
                 .withNetworkAliases(appendClusterName("pulsar-proxy"))
                 .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
                 .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
                 .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
-                .withEnv("clusterName", clusterName)
+                .withEnv("clusterName", clusterName);
                 // enable mTLS
+        if (spec.enableTls) {
+            proxyContainer
                 .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
                 .withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
                 .withEnv("forwardAuthorizationCredentials", "true")
@@ -147,7 +150,15 @@ public class PulsarCluster {
                 .withEnv("tlsAllowInsecureConnection", "false")
                 .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
                 .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
-                .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem");
+                .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
+                .withEnv("brokerClientAuthenticationPlugin", AuthenticationTls.class.getName())
+                .withEnv("brokerClientAuthenticationParameters", String.format("tlsCertFile:%s,tlsKeyFile:%s", "/pulsar/certificate-authority/client-keys/admin.cert.pem", "/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
+                .withEnv("tlsEnabledWithBroker", "true")
+                .withEnv("brokerClientTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
+                .withEnv("brokerClientCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
+                .withEnv("brokerClientKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
+
+        }
         if (spec.proxyEnvs != null) {
             spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
         }
@@ -184,7 +195,7 @@ public class PulsarCluster {
         // create brokers
         brokerContainers.putAll(
             runNumContainers("broker", spec.numBrokers(), (name) -> {
-                BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name))
+                BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name), spec.enableTls)
                         .withNetwork(network)
                         .withNetworkAliases(appendClusterName(name))
                         .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
@@ -195,16 +206,19 @@ public class PulsarCluster {
                         .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
                         // used in s3 tests
                         .withEnv("AWS_ACCESS_KEY_ID", "accesskey").withEnv("AWS_SECRET_KEY", "secretkey")
-                        .withEnv("maxMessageSize", "" + spec.maxMessageSize)
+                        .withEnv("maxMessageSize", "" + spec.maxMessageSize);
+                    if (spec.enableTls) {
                         // enable mTLS
-                        .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
-                        .withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS))
-                        .withEnv("authenticateOriginalAuthData", "true")
-                        .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
-                        .withEnv("tlsAllowInsecureConnection", "false")
-                        .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")
-                        .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem")
-                        .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem");
+                        brokerContainer
+                            .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
+                            .withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS))
+                            .withEnv("authenticateOriginalAuthData", "true")
+                            .withEnv("tlsAllowInsecureConnection", "false")
+                            .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
+                            .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
+                            .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")
+                            .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
+                    }
                     if (spec.queryLastMessage) {
                         brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
                         brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index fa28d20e6b3..c141e990d62 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -177,4 +177,10 @@ public class PulsarClusterSpec {
      * Additional ports to expose on bookie containers.
      */
     List<Integer> bookieAdditionalPorts;
+
+    /**
+     * Enable TLS for connection.
+     */
+    @Default
+    boolean enableTls = false;
 }