You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/25 08:01:22 UTC
[pulsar] branch branch-2.9 updated: [fix][broker] fix MetadataStoreException$NotFoundException while doing topic lookup (#15633)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 0813a1b40f8 [fix][broker] fix MetadataStoreException$NotFoundException while doing topic lookup (#15633)
0813a1b40f8 is described below
commit 0813a1b40f80b350eb0daa24fd4a3880e3bfb92e
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu May 19 22:07:44 2022 +0800
[fix][broker] fix MetadataStoreException$NotFoundException while doing topic lookup (#15633)
(cherry picked from commit 70551a6f6be05826b90295d3da5915613e53fa46)
---
.../pulsar/broker/loadbalance/ResourceUnit.java | 5 +++++
.../impl/ModularLoadManagerWrapper.java | 14 ++++++++++++--
.../loadbalance/impl/SimpleResourceUnit.java | 22 ++++++++++++++++++++--
.../pulsar/broker/namespace/NamespaceService.java | 16 +++++++++++-----
.../loadbalance/AdvertisedListenersTest.java | 18 +++++++++++-------
5 files changed, 59 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
index 1afde4e3657..51becdb7f77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
@@ -22,9 +22,14 @@ package org.apache.pulsar.broker.loadbalance;
ResourceUnit represents any machine/unit which has resources that broker can use to serve its service units
*/
public interface ResourceUnit extends Comparable<ResourceUnit> {
+
+ String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";
+
String getResourceId();
ResourceDescription getAvailableResource();
boolean canFit(ResourceDescription resourceDescription);
+
+ Object getProperty(String key);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index a138fe397b0..f2d61d2dcd1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.PulsarServerException;
@@ -64,8 +65,12 @@ public class ModularLoadManagerWrapper implements LoadManager {
@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
- return leastLoadedBroker.map(s -> new SimpleResourceUnit(getBrokerWebServiceUrl(s),
- new PulsarResourceDescription()));
+ return leastLoadedBroker.map(s -> {
+ String webServiceUrl = getBrokerWebServiceUrl(s);
+ String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
+ return new SimpleResourceUnit(webServiceUrl,
+ new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
+ });
}
private String getBrokerWebServiceUrl(String broker) {
@@ -77,6 +82,11 @@ public class ModularLoadManagerWrapper implements LoadManager {
return String.format("http://%s", broker);
}
+ private String getBrokerZnodeName(String broker, String webServiceUrl) {
+ String scheme = webServiceUrl.substring(0, webServiceUrl.indexOf("://"));
+ return String.format("%s://%s", scheme, broker);
+ }
+
@Override
public List<Metrics> getLoadBalancingMetrics() {
return loadManager.getLoadBalancingMetrics();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java
index 62f9b3e94a5..863a75dff42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java
@@ -19,19 +19,32 @@
package org.apache.pulsar.broker.loadbalance.impl;
import com.google.common.base.MoreObjects;
+import java.util.Collections;
+import java.util.Map;
import org.apache.pulsar.broker.loadbalance.ResourceDescription;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
public class SimpleResourceUnit implements ResourceUnit {
- private String resourceId;
- private ResourceDescription resourceDescription;
+ private final String resourceId;
+ private final ResourceDescription resourceDescription;
+
+ private final Map<String, Object> properties;
public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription) {
this.resourceId = resourceId;
this.resourceDescription = resourceDescription;
+ this.properties = Collections.emptyMap();
+ }
+
+ public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription,
+ Map<String, Object> properties) {
+ this.resourceId = resourceId;
+ this.resourceDescription = resourceDescription;
+ this.properties = properties == null ? Collections.emptyMap() : properties;
}
+
@Override
public String getResourceId() {
// TODO Auto-generated method stub
@@ -50,6 +63,11 @@ public class SimpleResourceUnit implements ResourceUnit {
return this.resourceDescription.compareTo(resourceDescription) > 0;
}
+ @Override
+ public Object getProperty(String key) {
+ return properties.get(key);
+ }
+
@Override
public int compareTo(ResourceUnit o) {
return resourceId.compareTo(o.getResourceId());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 25d935d8355..58c39ac9143 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -456,6 +457,7 @@ public class NamespaceService implements AutoCloseable {
return;
}
String candidateBroker = null;
+ String candidateBrokerAdvertisedAddr = null;
LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
@@ -516,7 +518,7 @@ public class NamespaceService implements AutoCloseable {
}
}
if (makeLoadManagerDecisionOnThisBroker) {
- Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
+ Optional<Pair<String, String>> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
@@ -524,7 +526,8 @@ public class NamespaceService implements AutoCloseable {
lookupFuture.complete(Optional.empty());
return;
}
- candidateBroker = availableBroker.get();
+ candidateBroker = availableBroker.get().getLeft();
+ candidateBrokerAdvertisedAddr = availableBroker.get().getRight();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
@@ -595,7 +598,8 @@ public class NamespaceService implements AutoCloseable {
}
// Now setting the redirect url
- createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
+ createLookupResult(candidateBrokerAdvertisedAddr == null ? candidateBroker
+ : candidateBrokerAdvertisedAddr, authoritativeRedirect, options.getAdvertisedListenerName())
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
@@ -690,7 +694,7 @@ public class NamespaceService implements AutoCloseable {
* @return
* @throws Exception
*/
- private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
+ private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
if (!leastLoadedBroker.isPresent()) {
LOG.warn("No broker is available for {}", serviceUnit);
@@ -698,12 +702,14 @@ public class NamespaceService implements AutoCloseable {
}
String lookupAddress = leastLoadedBroker.get().getResourceId();
+ String advertisedAddr = (String) leastLoadedBroker.get()
+ .getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getSafeWebServiceAddress(),
lookupAddress);
}
- return Optional.of(lookupAddress);
+ return Optional.of(Pair.of(lookupAddress, advertisedAddr));
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index 6ff49674e2e..489efa5755b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.loadbalance;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
import java.net.URI;
import java.util.Optional;
import lombok.Cleanup;
@@ -69,14 +71,14 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
int httpsPort = PortManager.nextFreePort();
// Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
- this.conf.setAdvertisedAddress(advertisedAddress);
- this.conf.setAdvertisedListeners(
+ conf.setAdvertisedAddress(advertisedAddress);
+ conf.setAdvertisedListeners(
"public:pulsar://localhost:" + pulsarPort +
",public_http:http://localhost:" + httpPort +
",public_https:https://localhost:" + httpsPort);
- this.conf.setBrokerServicePort(Optional.of(pulsarPort));
- this.conf.setWebServicePort(Optional.of(httpPort));
- this.conf.setWebServicePortTls(Optional.of(httpsPort));
+ conf.setBrokerServicePort(Optional.of(pulsarPort));
+ conf.setWebServicePort(Optional.of(httpPort));
+ conf.setWebServicePortTls(Optional.of(httpsPort));
}
@Test
@@ -85,6 +87,7 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
request.addHeader(HttpHeaders.ACCEPT, "application/json");
+ final String topic = "my-topic";
@Cleanup
CloseableHttpClient httpClient = HttpClients.createDefault();
@@ -104,14 +107,15 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
// Produce data
@Cleanup
Producer<String> p = pulsarClient.newProducer(Schema.STRING)
- .topic("my-topic")
+ .topic(topic)
.create();
p.send("hello");
// Verify we can get the correct HTTP redirect to the advertised listener
for (PulsarAdmin a : getAllAdmins()) {
- TopicStats s = a.topics().getStats("my-topic");
+ TopicStats s = a.topics().getStats(topic);
+ assertNotNull(a.lookups().lookupTopic(topic));
assertEquals(s.getPublishers().size(), 1);
}
}