You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/01 13:49:14 UTC

[GitHub] [pulsar] lhotari opened a new pull request #13069: [Broker] Fix and improve topic ownership assignment

lhotari opened a new pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069


   ### Motivation
   
   This PR depends on #13066 .
   
   When a lookup is made to a topic that isn't currently loaded, the decision will be made in a distributed fashion on the follower brokers since the information about the leader broker is missing (because `LeaderElectionService.getCurrentLeader()` always returned Optional.empty()). This leads to races when assigning the topic ownership to a broker, since the decision isn't made centrally on the leader broker.
   
   Besides the problem fixed by #13066 , there's another race condition in metadata operations.
   
   ```
   2021-12-01T15:19:16,659+0200 [metadata-store-266-1] WARN  com.github.benmanes.caffeine.cache.LocalAsyncCache - Exception thrown during asynchronous load
   java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl.lambda$create$12(MetadataCacheImpl.java:232) ~[classes/:?]
   	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$storePut$9(ZKMetadataStore.java:222) ~[classes/:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.68.Final.jar:4.1.68.Final]
   	at java.lang.Thread.run(Thread.java:829) [?:?]
   Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   ```
   
   This problem hasn't yet been fixed, but the goal is to fix it to complete this PR. The PR will be kept in draft state while the fix is in progress.
   
   ### Modifications
   
   - builds upon changes from #13066 
   - add failing test case `MultiBrokerLeaderElectionTest.shouldProvideConsistentAnswerToTopicLookup`
   - adds more logging to topic ownership assignment decision made in `NamespaceService.searchForCandidateBroker`
   - optimizes performance of the topic ownership assignment by using `LoadManager.getAvailableBrokers` instead of reading the list every time from Zookeeper


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985408661


   PR to backport this fix to branch-2.8 is #13117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761812117



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
##########
@@ -93,30 +96,51 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
         }
 
         CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+        doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+        return future;
+    }
+
+    private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
+                               Backoff backoff, long retryDeadline) {
         // Read the static bundle data from the policies
         pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
-
             if (result.isPresent()) {
                 try {
                     future.complete(readBundles(namespace,
                             result.get().getValue(), result.get().getStat().getVersion()));
                 } catch (IOException e) {
-                    future.completeExceptionally(e);
+                    handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
                 }
             } else {
                 // If no local policies defined for namespace, copy from global config
                 copyToLocalPolicies(namespace)
                         .thenAccept(b -> future.complete(b))
                         .exceptionally(ex -> {
-                            future.completeExceptionally(ex);
+                            handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
                             return null;
                         });
             }
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
         });
-        return future;
+    }
+
+    private void handleLoadBundlesRetry(NamespaceName namespace,
+                                        CompletableFuture<NamespaceBundles> future,
+                                        Backoff backoff, long retryDeadline, Throwable e) {
+        if (e instanceof Error || System.nanoTime() > retryDeadline) {

Review comment:
       exactly. typically Errors shouldn't be catched/retried.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761796040



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -454,6 +454,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
             // The leader election service was not initialized yet. This can happen because the broker service is
             // initialized first and it might start receiving lookup requests before the leader election service is
             // fully initialized.
+            LOG.warn("Leader election service isn't initialized yet. Returning empty result to lookup.");

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari merged pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari merged pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985354881


   I have a fix for branch-2.8 in https://github.com/lhotari/pulsar/commit/1a350804 . I'll make similar changes to this PR since they aren't 2.8 specific. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761812502



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
##########
@@ -93,30 +96,51 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
         }
 
         CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+        doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+        return future;
+    }
+
+    private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
+                               Backoff backoff, long retryDeadline) {
         // Read the static bundle data from the policies
         pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
-
             if (result.isPresent()) {
                 try {
                     future.complete(readBundles(namespace,
                             result.get().getValue(), result.get().getStat().getVersion()));
                 } catch (IOException e) {
-                    future.completeExceptionally(e);
+                    handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
                 }
             } else {
                 // If no local policies defined for namespace, copy from global config
                 copyToLocalPolicies(namespace)
                         .thenAccept(b -> future.complete(b))
                         .exceptionally(ex -> {
-                            future.completeExceptionally(ex);
+                            handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
                             return null;
                         });
             }
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
         });
-        return future;
+    }
+
+    private void handleLoadBundlesRetry(NamespaceName namespace,
+                                        CompletableFuture<NamespaceBundles> future,
+                                        Backoff backoff, long retryDeadline, Throwable e) {
+        if (e instanceof Error || System.nanoTime() > retryDeadline) {

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761431220



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);
+        String candidateBrokerHostAndPort = uri.getHost() + ":" + uri.getPort();
+        Set<String> availableBrokers = getAvailableBrokers();
+        for (String brokerHostPort : availableBrokers) {
+            if (candidateBrokerHostAndPort.equals(brokerHostPort)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
+                    LOG.debug("Broker {} is available.", brokerHostPort);
                 }
                 return true;
             }
         }

Review comment:
       Makes sense. I missed that opportunity in refactoring




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761452305



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);

Review comment:
       URI os generally expensive. Do we need this or it is worth to do it another way?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -454,6 +454,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
             // The leader election service was not initialized yet. This can happen because the broker service is
             // initialized first and it might start receiving lookup requests before the leader election service is
             // fully initialized.
+            LOG.warn("Leader election service isn't initialized yet. Returning empty result to lookup.");

Review comment:
       Do we have some meaningful context for whom reads the logs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761703749



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);
+        String candidateBrokerHostAndPort = uri.getHost() + ":" + uri.getPort();
+        Set<String> availableBrokers = getAvailableBrokers();
+        for (String brokerHostPort : availableBrokers) {
+            if (candidateBrokerHostAndPort.equals(brokerHostPort)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
+                    LOG.debug("Broker {} is available.", brokerHostPort);
                 }
                 return true;
             }
         }

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761812586



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,22 +639,36 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
-                }
-                return true;
+        String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+        Set<String> availableBrokers = getAvailableBrokers();
+        if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
             }
+            return true;
+        } else {
+            LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
+                    candidateBroker, candidateBrokerHostAndPort,
+                    availableBrokers.stream().collect(Collectors.joining(",")));
+            return false;
         }
+    }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+    private String parseHostAndPort(String candidateBroker) {

Review comment:
       I've tried to keep the patch minimal. I'll just make it static for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761388548



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);
+        String candidateBrokerHostAndPort = uri.getHost() + ":" + uri.getPort();
+        Set<String> availableBrokers = getAvailableBrokers();
+        for (String brokerHostPort : availableBrokers) {
+            if (candidateBrokerHostAndPort.equals(brokerHostPort)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
+                    LOG.debug("Broker {} is available.", brokerHostPort);
                 }
                 return true;
             }
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+            LOG.debug("Broker {} couldn't be found in available brokers {}",
+                    candidateBroker, availableBrokers);
         }
         return false;
     }
 
+    private Set<String> getAvailableBrokers() {
+        try {
+            return loadManager.get().getAvailableBrokers();
+        } catch (Exception e) {

Review comment:
       @merlimat I guess that shouldn't be done as part of this PR? I'd like to have this PR on branch-2.8 and branch-2.9 too to improve the topic ownership lookups. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761696667



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);

Review comment:
       Done. I also fixed the other location there URI.create was used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985300776


   My 2.8 test is invalid, I'll fix it and verify.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985316099


   Problem happens here: https://github.com/apache/pulsar/blob/0b9d51bb49a9fb7a94ba5d38b52dc7ea287f7731/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java#L104-L110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r760250358



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);
+        String candidateBrokerHostAndPort = uri.getHost() + ":" + uri.getPort();
+        Set<String> availableBrokers = getAvailableBrokers();
+        for (String brokerHostPort : availableBrokers) {
+            if (candidateBrokerHostAndPort.equals(brokerHostPort)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
+                    LOG.debug("Broker {} is available.", brokerHostPort);
                 }
                 return true;
             }
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+            LOG.debug("Broker {} couldn't be found in available brokers {}",
+                    candidateBroker, availableBrokers);
         }
         return false;
     }
 
+    private Set<String> getAvailableBrokers() {
+        try {
+            return loadManager.get().getAvailableBrokers();
+        } catch (Exception e) {

Review comment:
       can we avoid catching "Exception" ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985309595


   I fixed the test in https://github.com/lhotari/pulsar/commits/lh-fix-topic-ownership-assignment-branch-2.8 . I can now consistently reproduce the issue. It results in error 500 :
   
   ```
   10:22:44.069 [metadata-store-150-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-220-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-80-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-45-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-325-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-185-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-255-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-290-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.069 [metadata-store-115-1] INFO  org.apache.pulsar.common.naming.NamespaceBundleFactory - Policy updated for namespace public/default, refreshing the bundle cache.
   10:22:44.070 [metadata-store-255-1] WARN  org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup broker for topic persistent://public/default/lookuptest0e133752-6e62-415f-994b-ef5d9ad57730-0: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$storePut$15(ZKMetadataStore.java:226) ~[pulsar-metadata-2.8.2.jar:2.8.2]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.68.Final.jar:4.1.68.Final]
   	at java.lang.Thread.run(Thread.java:829) [?:?]
   Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:308) ~[pulsar-metadata-2.8.2.jar:2.8.2]
   	... 5 more
   Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   	at org.apache.zookeeper.KeeperException.create(KeeperException.java:122) ~[zookeeper-3.6.3.jar:3.6.3]
   	at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[zookeeper-3.6.3.jar:3.6.3]
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:304) ~[pulsar-metadata-2.8.2.jar:2.8.2]
   	... 5 more
   10:22:44.070 [metadata-store-185-1] WARN  org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup broker for topic persistent://public/default/lookuptest0e133752-6e62-415f-994b-ef5d9ad57730-0: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$storePut$15(ZKMetadataStore.java:226) ~[pulsar-metadata-2.8.2.jar:2.8.2]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.68.Final.jar:4.1.68.Final]
   	at java.lang.Thread.run(Thread.java:829) [?:?]
   Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:308) ~[pulsar-metadata-2.8.2.jar:2.8.2]
   	... 5 more
   Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/local-policies/public/default
   	at org.apache.zookeeper.KeeperException.create(KeeperException.java:122) ~[zookeeper-3.6.3.jar:3.6.3]
   	at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) ~[zookeeper-3.6.3.jar:3.6.3]
   	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:304) ~[pulsar-metadata-2.8.2.jar:2.8.2]
   	... 5 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761425446



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);
+        String candidateBrokerHostAndPort = uri.getHost() + ":" + uri.getPort();
+        Set<String> availableBrokers = getAvailableBrokers();
+        for (String brokerHostPort : availableBrokers) {
+            if (candidateBrokerHostAndPort.equals(brokerHostPort)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
+                    LOG.debug("Broker {} is available.", brokerHostPort);
                 }
                 return true;
             }
         }

Review comment:
       Nit: we could use `availableBrokers.contains(candidateBrokerHostAndPort)` here, which could be a minor optimization for clusters with many brokers and many topics getting loaded.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761813290



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,22 +639,36 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
-                }
-                return true;
+        String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+        Set<String> availableBrokers = getAvailableBrokers();
+        if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
             }
+            return true;
+        } else {
+            LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
+                    candidateBroker, candidateBrokerHostAndPort,
+                    availableBrokers.stream().collect(Collectors.joining(",")));
+            return false;
         }
+    }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+    private String parseHostAndPort(String candidateBroker) {

Review comment:
       makes sense to me, especially because we have to pick this to a few other old branches




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r760275208



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);
+        String candidateBrokerHostAndPort = uri.getHost() + ":" + uri.getPort();
+        Set<String> availableBrokers = getAvailableBrokers();
+        for (String brokerHostPort : availableBrokers) {
+            if (candidateBrokerHostAndPort.equals(brokerHostPort)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
+                    LOG.debug("Broker {} is available.", brokerHostPort);
                 }
                 return true;
             }
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+            LOG.debug("Broker {} couldn't be found in available brokers {}",
+                    candidateBroker, availableBrokers);
         }
         return false;
     }
 
+    private Set<String> getAvailableBrokers() {
+        try {
+            return loadManager.get().getAvailableBrokers();
+        } catch (Exception e) {

Review comment:
       not without changing the LoadManager interface. The `getAvailableBrokers` method throws `Exception`. https://github.com/apache/pulsar/blob/9c4f182468e4e4faddd3928d4a0b85d3cf3565ec/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java#L116




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985296687


   I converted this PR to draft since there are more problems. 
   
   I started a separate branch targeting 2.8.x , that is https://github.com/lhotari/pulsar/commits/lh-fix-topic-ownership-assignment-branch-2.8 . It currently contains a test case that reproduces error 404 and error 500 issues.
   
   The problems are different when using a real Zookeeper server. In the port of this fix for branch-2.8, I  made a change to use TestZKServer from pulsar-metadata , https://github.com/lhotari/pulsar/commit/d44bf6faaca922182b3178b4de17397bdfe7d4ce . 
   That produces issues that seem to match the analysis made by @zbentley in https://github.com/apache/pulsar/issues/12552#issuecomment-955748696 . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r760352866



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);
+        String candidateBrokerHostAndPort = uri.getHost() + ":" + uri.getPort();
+        Set<String> availableBrokers = getAvailableBrokers();
+        for (String brokerHostPort : availableBrokers) {
+            if (candidateBrokerHostAndPort.equals(brokerHostPort)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
+                    LOG.debug("Broker {} is available.", brokerHostPort);
                 }
                 return true;
             }
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+            LOG.debug("Broker {} couldn't be found in available brokers {}",
+                    candidateBroker, availableBrokers);
         }
         return false;
     }
 
+    private Set<String> getAvailableBrokers() {
+        try {
+            return loadManager.get().getAvailableBrokers();
+        } catch (Exception e) {

Review comment:
       It should prob get converted into a `CompletableFuture<List<String>> getAvailableBrokersAsync()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761662549



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);

Review comment:
       Actually good point @eolivelli . I'll modify it to search for "://" and just take the remaining characters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761809476



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,22 +639,36 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
-                }
-                return true;
+        String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+        Set<String> availableBrokers = getAvailableBrokers();
+        if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
             }
+            return true;
+        } else {
+            LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
+                    candidateBroker, candidateBrokerHostAndPort,
+                    availableBrokers.stream().collect(Collectors.joining(",")));
+            return false;
         }
+    }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Broker not found for SLA Monitoring Namespace {}",
-                    candidateBroker + ":" + config.getWebServicePort());
+    private String parseHostAndPort(String candidateBroker) {

Review comment:
       what about moving this method to some utility class ?
   
   also we should make this "static"

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
##########
@@ -93,30 +96,51 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
         }
 
         CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+        doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+        return future;
+    }
+
+    private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
+                               Backoff backoff, long retryDeadline) {
         // Read the static bundle data from the policies
         pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
-
             if (result.isPresent()) {
                 try {
                     future.complete(readBundles(namespace,
                             result.get().getValue(), result.get().getStat().getVersion()));
                 } catch (IOException e) {
-                    future.completeExceptionally(e);
+                    handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
                 }
             } else {
                 // If no local policies defined for namespace, copy from global config
                 copyToLocalPolicies(namespace)
                         .thenAccept(b -> future.complete(b))
                         .exceptionally(ex -> {
-                            future.completeExceptionally(ex);
+                            handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
                             return null;
                         });
             }
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
         });
-        return future;
+    }
+
+    private void handleLoadBundlesRetry(NamespaceName namespace,
+                                        CompletableFuture<NamespaceBundles> future,
+                                        Backoff backoff, long retryDeadline, Throwable e) {
+        if (e instanceof Error || System.nanoTime() > retryDeadline) {

Review comment:
       why `Error` ? like OutOfMemoryError ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
##########
@@ -93,30 +96,51 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
         }
 
         CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+        doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+        return future;
+    }
+
+    private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
+                               Backoff backoff, long retryDeadline) {
         // Read the static bundle data from the policies
         pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
-
             if (result.isPresent()) {
                 try {
                     future.complete(readBundles(namespace,
                             result.get().getValue(), result.get().getStat().getVersion()));
                 } catch (IOException e) {
-                    future.completeExceptionally(e);
+                    handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
                 }
             } else {
                 // If no local policies defined for namespace, copy from global config
                 copyToLocalPolicies(namespace)
                         .thenAccept(b -> future.complete(b))
                         .exceptionally(ex -> {
-                            future.completeExceptionally(ex);
+                            handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
                             return null;
                         });
             }
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
         });
-        return future;
+    }
+
+    private void handleLoadBundlesRetry(NamespaceName namespace,
+                                        CompletableFuture<NamespaceBundles> future,
+                                        Backoff backoff, long retryDeadline, Throwable e) {
+        if (e instanceof Error || System.nanoTime() > retryDeadline) {
+            future.completeExceptionally(e);
+        } else {
+            LOG.warn("Error loading bundle for {}. Retrying exception", namespace, e);
+            long retryDelay = backoff.next();
+            pulsar.getExecutor().schedule(() ->
+                    doLoadBundles(namespace, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private Backoff createBackoff() {

Review comment:
       static ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985370487


   @codelipenghui @eolivelli Please review the recent changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761661783



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,24 +640,33 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
     }
 
     private boolean isBrokerActive(String candidateBroker) {
-        List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
-        for (String brokerHostPort : brokers) {
-            if (candidateBroker.equals("http://" + brokerHostPort)) {
+        URI uri = URI.create(candidateBroker);

Review comment:
       It's useful since the input is an URL (also an URI). I checked the implementation of URI parsing and it seems very efficient. If this becomes a bottleneck, it could be cached.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari edited a comment on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari edited a comment on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985317418


   Exception gets handled here: https://github.com/apache/pulsar/blob/e1fbccdd7cf0a6f6ed29f6fadeb498524b9635b4/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L205-L208
   and here:
   https://github.com/apache/pulsar/blob/5dc5de849b582c7f312764db043da9483c17146a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L126-L130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #13069: [Broker] Fix and improve topic ownership assignment

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#issuecomment-985317418


   Exception gets swallowed here: https://github.com/apache/pulsar/blob/e1fbccdd7cf0a6f6ed29f6fadeb498524b9635b4/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L205-L208


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org