You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/19 17:48:55 UTC

(pulsar) branch branch-3.0 updated: [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5aedbdb2687 [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633)
5aedbdb2687 is described below

commit 5aedbdb26878cdb3b67431f3e9a8fb444cb47773
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Tue Dec 5 20:18:53 2023 +0800

    [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633)
    
    Co-authored-by: Jiwe Guo <te...@apache.org>
    (cherry picked from commit f8067b50c0d68cb723e5e5cd681b1697329ae012)
---
 .../pulsar/broker/loadbalance/NoopLoadManager.java |  2 +-
 .../loadbalance/extensions/BrokerRegistryImpl.java |  2 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  4 +-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  4 +-
 .../pulsar/broker/namespace/OwnershipCache.java    |  6 +--
 .../pulsar/broker/web/PulsarWebResource.java       |  2 +-
 .../loadbalance/SimpleLoadManagerImplTest.java     | 58 ++++++++++++++++++++--
 7 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 0de2ae92db6..80f887d394d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -61,7 +61,7 @@ public class NoopLoadManager implements LoadManager {
         localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
                 new PulsarResourceDescription());
 
-        LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
+        LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
         localData.setProtocols(pulsar.getProtocolDataToAdvertise());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 921ce35b5c6..bfdaa078f19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -84,7 +84,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         this.listeners = new ArrayList<>();
         this.brokerId = pulsar.getLookupServiceAddress();
         this.brokerLookupData = new BrokerLookupData(
-                pulsar.getSafeWebServiceAddress(),
+                pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(),
                 pulsar.getBrokerServiceUrlTls(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index b664b1ca2ba..255d7aa77af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -980,14 +980,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
             // At this point, the ports will be updated with the real port number that the server was assigned
             Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
 
-            lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+            lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                     pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
             lastData.setProtocols(protocolData);
             // configure broker-topic mode
             lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
             lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
 
-            localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+            localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                     pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
             localData.setProtocols(protocolData);
             localData.setBrokerVersionString(pulsar.getBrokerVersion());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 5e994569711..d54579a2861 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -234,7 +234,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
         }
         this.policies = new SimpleResourceAllocationPolicies(pulsar);
-        lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+        lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
         lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
@@ -1072,7 +1072,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
     private LoadReport generateLoadReportForcefully() throws Exception {
         synchronized (bundleGainsCache) {
             try {
-                LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(),
+                LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(),
                         pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(),
                         pulsar.getBrokerServiceUrlTls());
                 loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 86003153714..0033abf36c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -122,10 +122,10 @@ public class OwnershipCache {
         this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
         this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
         this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
-                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+                pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                 false, pulsar.getAdvertisedListeners());
         this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
-                pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+                pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                 true, pulsar.getAdvertisedListeners());
         this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
         this.locallyAcquiredLocks = new ConcurrentHashMap<>();
@@ -336,7 +336,7 @@ public class OwnershipCache {
 
     public synchronized boolean refreshSelfOwnerInfo() {
         this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
-                pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
+                pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
         return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bdb052d7b9e..de1296dbdd8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -1205,7 +1205,7 @@ public abstract class PulsarWebResource {
     protected void validateBrokerName(String broker) {
         String brokerUrl = String.format("http://%s", broker);
         String brokerUrlTls = String.format("https://%s", broker);
-        if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
+        if (!brokerUrl.equals(pulsar().getWebServiceAddress())
                 && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
             String[] parts = broker.split(":");
             checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index c4898786e3e..cf932ce5b60 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,12 +55,16 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -71,6 +75,7 @@ import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -92,8 +97,14 @@ public class SimpleLoadManagerImplTest {
     BrokerStats brokerStatsClient2;
 
     String primaryHost;
+
+    String primaryTlsHost;
+
     String secondaryHost;
 
+    private String defaultNamespace;
+    private String defaultTenant;
+
     ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
 
     @BeforeMethod
@@ -107,6 +118,7 @@ public class SimpleLoadManagerImplTest {
         ServiceConfiguration config1 = new ServiceConfiguration();
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
+        config1.setWebServicePortTls(Optional.of(0));
         config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -122,11 +134,13 @@ public class SimpleLoadManagerImplTest {
         admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
         brokerStatsClient1 = admin1.brokerStats();
         primaryHost = pulsar1.getWebServiceAddress();
+        primaryTlsHost = pulsar1.getWebServiceAddressTls();
 
         // Start broker 2
         ServiceConfiguration config2 = new ServiceConfiguration();
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
+        config2.setWebServicePortTls(Optional.of(0));
         config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -143,6 +157,8 @@ public class SimpleLoadManagerImplTest {
         brokerStatsClient2 = admin2.brokerStats();
         secondaryHost = pulsar2.getWebServiceAddress();
         Thread.sleep(100);
+
+        setupClusters();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -254,10 +270,9 @@ public class SimpleLoadManagerImplTest {
         sortedRankingsInstance.get().put(lr.getRank(rd), rus);
         setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance);
 
-        ResourceUnit found = loadManager
-                .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
+        final Optional<ResourceUnit> leastLoaded = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
         // broker is not active so found should be null
-        assertNotEquals(found, null, "did not find a broker when expected one to be found");
+        assertFalse(leastLoaded.isPresent());
 
     }
 
@@ -395,7 +410,7 @@ public class SimpleLoadManagerImplTest {
         final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();
 
         for (final NamespaceBundle bundle : bundles) {
-            if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) {
+            if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost))) {
                 ++numAssignedToPrimary;
             } else {
                 ++numAssignedToSecondary;
@@ -407,6 +422,10 @@ public class SimpleLoadManagerImplTest {
         }
     }
 
+    private static String getAddress(String url) {
+        return url.replaceAll("https", "http");
+    }
+
     @Test
     public void testNamespaceBundleStats() {
         NamespaceBundleStats nsb1 = new NamespaceBundleStats();
@@ -475,4 +494,33 @@ public class SimpleLoadManagerImplTest {
         assertEquals(usage.getBandwidthIn().usage, usageLimit);
     }
 
+    @Test
+    public void testGetWebSerUrl() throws PulsarAdminException {
+        String webServiceUrl = admin1.brokerStats().getLoadReport().getWebServiceUrl();
+        Assert.assertEquals(webServiceUrl, pulsar1.getWebServiceAddress());
+
+        String webServiceUrl2 = admin2.brokerStats().getLoadReport().getWebServiceUrl();
+        Assert.assertEquals(webServiceUrl2, pulsar2.getWebServiceAddress());
+    }
+
+    @Test
+    public void testRedirectOwner() throws PulsarAdminException {
+        final String topicName = "persistent://" + defaultNamespace + "/" + "test-topic";
+        admin1.topics().createNonPartitionedTopic(topicName);
+        TopicStats stats = admin1.topics().getStats(topicName);
+        Assert.assertNotNull(stats);
+
+        TopicStats stats2 = admin2.topics().getStats(topicName);
+        Assert.assertNotNull(stats2);
+    }
+
+    private void setupClusters() throws PulsarAdminException {
+        admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use"));
+        defaultTenant = "prop-xyz";
+        admin1.tenants().createTenant(defaultTenant, tenantInfo);
+        defaultNamespace = defaultTenant + "/ns1";
+        admin1.namespaces().createNamespace(defaultNamespace, Set.of("use"));
+    }
+
 }