You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/25 08:01:22 UTC

[pulsar] branch branch-2.9 updated: [fix][broker] fix MetadataStoreException$NotFoundException while doing topic lookup (#15633)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 0813a1b40f8 [fix][broker] fix MetadataStoreException$NotFoundException while doing topic lookup (#15633)
0813a1b40f8 is described below

commit 0813a1b40f80b350eb0daa24fd4a3880e3bfb92e
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu May 19 22:07:44 2022 +0800

    [fix][broker] fix MetadataStoreException$NotFoundException while doing topic lookup (#15633)
    
    (cherry picked from commit 70551a6f6be05826b90295d3da5915613e53fa46)
---
 .../pulsar/broker/loadbalance/ResourceUnit.java    |  5 +++++
 .../impl/ModularLoadManagerWrapper.java            | 14 ++++++++++++--
 .../loadbalance/impl/SimpleResourceUnit.java       | 22 ++++++++++++++++++++--
 .../pulsar/broker/namespace/NamespaceService.java  | 16 +++++++++++-----
 .../loadbalance/AdvertisedListenersTest.java       | 18 +++++++++++-------
 5 files changed, 59 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
index 1afde4e3657..51becdb7f77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ResourceUnit.java
@@ -22,9 +22,14 @@ package org.apache.pulsar.broker.loadbalance;
     ResourceUnit represents any machine/unit which has resources that broker can use to serve its service units
  */
 public interface ResourceUnit extends Comparable<ResourceUnit> {
+
+    String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";
+
     String getResourceId();
 
     ResourceDescription getAvailableResource();
 
     boolean canFit(ResourceDescription resourceDescription);
+
+    Object getProperty(String key);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index a138fe397b0..f2d61d2dcd1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -64,8 +65,12 @@ public class ModularLoadManagerWrapper implements LoadManager {
     @Override
     public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
         Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
-        return leastLoadedBroker.map(s -> new SimpleResourceUnit(getBrokerWebServiceUrl(s),
-                new PulsarResourceDescription()));
+        return leastLoadedBroker.map(s -> {
+            String webServiceUrl = getBrokerWebServiceUrl(s);
+            String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
+            return new SimpleResourceUnit(webServiceUrl,
+                new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
+        });
     }
 
     private String getBrokerWebServiceUrl(String broker) {
@@ -77,6 +82,11 @@ public class ModularLoadManagerWrapper implements LoadManager {
         return String.format("http://%s", broker);
     }
 
+    private String getBrokerZnodeName(String broker, String webServiceUrl) {
+        String scheme = webServiceUrl.substring(0, webServiceUrl.indexOf("://"));
+        return String.format("%s://%s", scheme, broker);
+    }
+
     @Override
     public List<Metrics> getLoadBalancingMetrics() {
         return loadManager.getLoadBalancingMetrics();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java
index 62f9b3e94a5..863a75dff42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceUnit.java
@@ -19,19 +19,32 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import com.google.common.base.MoreObjects;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.pulsar.broker.loadbalance.ResourceDescription;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 
 public class SimpleResourceUnit implements ResourceUnit {
 
-    private String resourceId;
-    private ResourceDescription resourceDescription;
+    private final String resourceId;
+    private final ResourceDescription resourceDescription;
+
+    private final Map<String, Object> properties;
 
     public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription) {
         this.resourceId = resourceId;
         this.resourceDescription = resourceDescription;
+        this.properties = Collections.emptyMap();
+    }
+
+    public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription,
+                              Map<String, Object> properties) {
+        this.resourceId = resourceId;
+        this.resourceDescription = resourceDescription;
+        this.properties = properties == null ? Collections.emptyMap() : properties;
     }
 
+
     @Override
     public String getResourceId() {
         // TODO Auto-generated method stub
@@ -50,6 +63,11 @@ public class SimpleResourceUnit implements ResourceUnit {
         return this.resourceDescription.compareTo(resourceDescription) > 0;
     }
 
+    @Override
+    public Object getProperty(String key) {
+        return properties.get(key);
+    }
+
     @Override
     public int compareTo(ResourceUnit o) {
         return resourceId.compareTo(o.getResourceId());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 25d935d8355..58c39ac9143 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -456,6 +457,7 @@ public class NamespaceService implements AutoCloseable {
             return;
         }
         String candidateBroker = null;
+        String candidateBrokerAdvertisedAddr = null;
 
         LeaderElectionService les = pulsar.getLeaderElectionService();
         if (les == null) {
@@ -516,7 +518,7 @@ public class NamespaceService implements AutoCloseable {
                         }
                     }
                     if (makeLoadManagerDecisionOnThisBroker) {
-                        Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
+                        Optional<Pair<String, String>> availableBroker = getLeastLoadedFromLoadManager(bundle);
                         if (!availableBroker.isPresent()) {
                             LOG.warn("Load manager didn't return any available broker. "
                                             + "Returning empty result to lookup. NamespaceBundle[{}]",
@@ -524,7 +526,8 @@ public class NamespaceService implements AutoCloseable {
                             lookupFuture.complete(Optional.empty());
                             return;
                         }
-                        candidateBroker = availableBroker.get();
+                        candidateBroker = availableBroker.get().getLeft();
+                        candidateBrokerAdvertisedAddr = availableBroker.get().getRight();
                         authoritativeRedirect = true;
                     } else {
                         // forward to leader broker to make assignment
@@ -595,7 +598,8 @@ public class NamespaceService implements AutoCloseable {
                 }
 
                 // Now setting the redirect url
-                createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
+                createLookupResult(candidateBrokerAdvertisedAddr == null ? candidateBroker
+                        : candidateBrokerAdvertisedAddr, authoritativeRedirect, options.getAdvertisedListenerName())
                         .thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
                         .exceptionally(ex -> {
                             lookupFuture.completeExceptionally(ex);
@@ -690,7 +694,7 @@ public class NamespaceService implements AutoCloseable {
      * @return
      * @throws Exception
      */
-    private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
+    private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
         Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
         if (!leastLoadedBroker.isPresent()) {
             LOG.warn("No broker is available for {}", serviceUnit);
@@ -698,12 +702,14 @@ public class NamespaceService implements AutoCloseable {
         }
 
         String lookupAddress = leastLoadedBroker.get().getResourceId();
+        String advertisedAddr = (String) leastLoadedBroker.get()
+                .getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
         if (LOG.isDebugEnabled()) {
             LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
                     pulsar.getSafeWebServiceAddress(),
                     lookupAddress);
         }
-        return Optional.of(lookupAddress);
+        return Optional.of(Pair.of(lookupAddress, advertisedAddr));
     }
 
     public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
index 6ff49674e2e..489efa5755b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.loadbalance;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
 import java.net.URI;
 import java.util.Optional;
 import lombok.Cleanup;
@@ -69,14 +71,14 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
         int httpsPort = PortManager.nextFreePort();
 
         // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
-        this.conf.setAdvertisedAddress(advertisedAddress);
-        this.conf.setAdvertisedListeners(
+        conf.setAdvertisedAddress(advertisedAddress);
+        conf.setAdvertisedListeners(
                 "public:pulsar://localhost:" + pulsarPort +
                         ",public_http:http://localhost:" + httpPort +
                         ",public_https:https://localhost:" + httpsPort);
-        this.conf.setBrokerServicePort(Optional.of(pulsarPort));
-        this.conf.setWebServicePort(Optional.of(httpPort));
-        this.conf.setWebServicePortTls(Optional.of(httpsPort));
+        conf.setBrokerServicePort(Optional.of(pulsarPort));
+        conf.setWebServicePort(Optional.of(httpPort));
+        conf.setWebServicePortTls(Optional.of(httpsPort));
     }
 
     @Test
@@ -85,6 +87,7 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
                 new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
         request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
         request.addHeader(HttpHeaders.ACCEPT, "application/json");
+        final String topic = "my-topic";
 
         @Cleanup
         CloseableHttpClient httpClient = HttpClients.createDefault();
@@ -104,14 +107,15 @@ public class AdvertisedListenersTest extends MultiBrokerBaseTest {
         // Produce data
         @Cleanup
         Producer<String> p = pulsarClient.newProducer(Schema.STRING)
-                .topic("my-topic")
+                .topic(topic)
                 .create();
 
         p.send("hello");
 
         // Verify we can get the correct HTTP redirect to the advertised listener
         for (PulsarAdmin a : getAllAdmins()) {
-            TopicStats s = a.topics().getStats("my-topic");
+            TopicStats s = a.topics().getStats(topic);
+            assertNotNull(a.lookups().lookupTopic(topic));
             assertEquals(s.getPublishers().size(), 1);
         }
     }