You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/19 17:48:55 UTC
(pulsar) branch branch-3.0 updated: [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5aedbdb2687 [fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633)
5aedbdb2687 is described below
commit 5aedbdb26878cdb3b67431f3e9a8fb444cb47773
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Tue Dec 5 20:18:53 2023 +0800
[fix][broker] Fix returns wrong webServiceUrl when both webServicePort and webServicePortTls are set (#21633)
Co-authored-by: Jiwe Guo <te...@apache.org>
(cherry picked from commit f8067b50c0d68cb723e5e5cd681b1697329ae012)
---
.../pulsar/broker/loadbalance/NoopLoadManager.java | 2 +-
.../loadbalance/extensions/BrokerRegistryImpl.java | 2 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 4 +-
.../loadbalance/impl/SimpleLoadManagerImpl.java | 4 +-
.../pulsar/broker/namespace/OwnershipCache.java | 6 +--
.../pulsar/broker/web/PulsarWebResource.java | 2 +-
.../loadbalance/SimpleLoadManagerImplTest.java | 58 ++++++++++++++++++++--
7 files changed, 63 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 0de2ae92db6..80f887d394d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -61,7 +61,7 @@ public class NoopLoadManager implements LoadManager {
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());
- LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
+ LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 921ce35b5c6..bfdaa078f19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -84,7 +84,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
this.listeners = new ArrayList<>();
this.brokerId = pulsar.getLookupServiceAddress();
this.brokerLookupData = new BrokerLookupData(
- pulsar.getSafeWebServiceAddress(),
+ pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index b664b1ca2ba..255d7aa77af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -980,14 +980,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// At this point, the ports will be updated with the real port number that the server was assigned
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
- lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
lastData.setProtocols(protocolData);
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
- localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(protocolData);
localData.setBrokerVersionString(pulsar.getBrokerVersion());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 5e994569711..d54579a2861 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -234,7 +234,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
}
this.policies = new SimpleResourceAllocationPolicies(pulsar);
- lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
@@ -1072,7 +1072,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
private LoadReport generateLoadReportForcefully() throws Exception {
synchronized (bundleGainsCache) {
try {
- LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(),
+ LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls());
loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 86003153714..0033abf36c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -122,10 +122,10 @@ public class OwnershipCache {
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
- pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
false, pulsar.getAdvertisedListeners());
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
- pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
true, pulsar.getAdvertisedListeners());
this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
this.locallyAcquiredLocks = new ConcurrentHashMap<>();
@@ -336,7 +336,7 @@ public class OwnershipCache {
public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
- pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
+ pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bdb052d7b9e..de1296dbdd8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -1205,7 +1205,7 @@ public abstract class PulsarWebResource {
protected void validateBrokerName(String broker) {
String brokerUrl = String.format("http://%s", broker);
String brokerUrlTls = String.format("https://%s", broker);
- if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
+ if (!brokerUrl.equals(pulsar().getWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index c4898786e3e..cf932ce5b60 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,12 +55,16 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ResourceQuota;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -71,6 +75,7 @@ import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -92,8 +97,14 @@ public class SimpleLoadManagerImplTest {
BrokerStats brokerStatsClient2;
String primaryHost;
+
+ String primaryTlsHost;
+
String secondaryHost;
+ private String defaultNamespace;
+ private String defaultTenant;
+
ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@BeforeMethod
@@ -107,6 +118,7 @@ public class SimpleLoadManagerImplTest {
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
+ config1.setWebServicePortTls(Optional.of(0));
config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -122,11 +134,13 @@ public class SimpleLoadManagerImplTest {
admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
brokerStatsClient1 = admin1.brokerStats();
primaryHost = pulsar1.getWebServiceAddress();
+ primaryTlsHost = pulsar1.getWebServiceAddressTls();
// Start broker 2
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
+ config2.setWebServicePortTls(Optional.of(0));
config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -143,6 +157,8 @@ public class SimpleLoadManagerImplTest {
brokerStatsClient2 = admin2.brokerStats();
secondaryHost = pulsar2.getWebServiceAddress();
Thread.sleep(100);
+
+ setupClusters();
}
@AfterMethod(alwaysRun = true)
@@ -254,10 +270,9 @@ public class SimpleLoadManagerImplTest {
sortedRankingsInstance.get().put(lr.getRank(rd), rus);
setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance);
- ResourceUnit found = loadManager
- .getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
+ final Optional<ResourceUnit> leastLoaded = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
// broker is not active so found should be null
- assertNotEquals(found, null, "did not find a broker when expected one to be found");
+ assertFalse(leastLoaded.isPresent());
}
@@ -395,7 +410,7 @@ public class SimpleLoadManagerImplTest {
final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();
for (final NamespaceBundle bundle : bundles) {
- if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) {
+ if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost))) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
@@ -407,6 +422,10 @@ public class SimpleLoadManagerImplTest {
}
}
+ private static String getAddress(String url) {
+ return url.replaceAll("https", "http");
+ }
+
@Test
public void testNamespaceBundleStats() {
NamespaceBundleStats nsb1 = new NamespaceBundleStats();
@@ -475,4 +494,33 @@ public class SimpleLoadManagerImplTest {
assertEquals(usage.getBandwidthIn().usage, usageLimit);
}
+ @Test
+ public void testGetWebSerUrl() throws PulsarAdminException {
+ String webServiceUrl = admin1.brokerStats().getLoadReport().getWebServiceUrl();
+ Assert.assertEquals(webServiceUrl, pulsar1.getWebServiceAddress());
+
+ String webServiceUrl2 = admin2.brokerStats().getLoadReport().getWebServiceUrl();
+ Assert.assertEquals(webServiceUrl2, pulsar2.getWebServiceAddress());
+ }
+
+ @Test
+ public void testRedirectOwner() throws PulsarAdminException {
+ final String topicName = "persistent://" + defaultNamespace + "/" + "test-topic";
+ admin1.topics().createNonPartitionedTopic(topicName);
+ TopicStats stats = admin1.topics().getStats(topicName);
+ Assert.assertNotNull(stats);
+
+ TopicStats stats2 = admin2.topics().getStats(topicName);
+ Assert.assertNotNull(stats2);
+ }
+
+ private void setupClusters() throws PulsarAdminException {
+ admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use"));
+ defaultTenant = "prop-xyz";
+ admin1.tenants().createTenant(defaultTenant, tenantInfo);
+ defaultNamespace = defaultTenant + "/ns1";
+ admin1.namespaces().createNamespace(defaultNamespace, Set.of("use"));
+ }
+
}