You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/04/03 02:08:09 UTC

(pulsar) branch branch-3.0 updated: [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) (#22403)

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0404d6187ef [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) (#22403)
0404d6187ef is described below

commit 0404d6187ef3c9db1ee2ca80e21058243b45e990
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Apr 2 19:08:04 2024 -0700

    [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) (#22403)
---
 .../extensions/manager/UnloadManager.java          | 20 +++++++++-
 .../pulsar/broker/service/BrokerService.java       | 11 +++++-
 .../pulsar/broker/web/PulsarWebResource.java       |  7 +++-
 .../ExtensibleLoadManagerImplBaseTest.java         |  4 --
 .../extensions/ExtensibleLoadManagerImplTest.java  | 27 +++++++++++++
 .../extensions/manager/UnloadManagerTest.java      | 44 ++++++++++++----------
 6 files changed, 86 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index ffdbbc2af42..bf9885b2a25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.manager;
 
+import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
+import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
 import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
 import java.util.Map;
@@ -25,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
 import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
@@ -88,14 +91,27 @@ public class UnloadManager implements StateChangeListener {
 
     @Override
     public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
-        if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+        ServiceUnitState state = ServiceUnitStateData.state(data);
+
+        if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
+            }
+            return;
+        }
+
+        if (t != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
             }
             this.complete(serviceUnit, t);
             return;
         }
-        ServiceUnitState state = ServiceUnitStateData.state(data);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Handling {} for service unit {}", data, serviceUnit);
+        }
+
         switch (state) {
             case Free, Owned -> this.complete(serviceUnit, t);
             default -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fbec9bf413a..bfb289678d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2277,9 +2277,18 @@ public class BrokerService implements Closeable {
                 }
                 closeFutures.add(topicFuture
                         .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
-                                : CompletableFuture.completedFuture(null)));
+                                : CompletableFuture.completedFuture(null))
+                        .exceptionally(e -> {
+                            if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException
+                                    && e.getMessage().contains("Please redo the lookup")) {
+                                log.warn("[{}] Topic ownership check failed. Skipping it", topicName);
+                                return null;
+                            }
+                            throw FutureUtil.wrapToCompletionException(e);
+                        }));
             }
         });
+
         if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
                 && serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
             TransactionMetadataStoreService metadataStoreService =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index c2ac73d39f5..ce5685f5017 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -610,12 +610,17 @@ public abstract class PulsarWebResource {
         NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
         NamespaceService nsService = pulsar().getNamespaceService();
 
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+            return nsService.checkOwnershipPresentAsync(nsBundle);
+        }
+
         LookupOptions options = LookupOptions.builder()
                 .authoritative(false)
                 .requestHttps(isRequestHttps())
                 .readOnly(true)
                 .loadTopicsInBundle(false).build();
-        return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(optionUrl -> optionUrl.isPresent());
+
+        return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
     }
 
     protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 3353fbb96f2..c74820e69a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -63,13 +62,10 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ
 
     protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
         conf.setForceDeleteNamespaceAllowed(true);
-        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-        conf.setAllowAutoTopicCreation(true);
         conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
         conf.setLoadBalancerSheddingEnabled(false);
         conf.setLoadBalancerDebugModeEnabled(true);
-        conf.setTopicLevelPoliciesEnabled(true);
         return conf;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 87c199676e0..ef6d267b450 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -64,6 +64,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -236,6 +237,32 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase
         assertTrue(brokerLookupData.isPresent());
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadUponTopicLookupFailure() throws Exception {
+        TopicName topicName =
+                TopicName.get("public/test/testUnloadUponTopicLookupFailure");
+        NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName);
+        primaryLoadManager.assign(Optional.empty(), bundle).get();
+
+        CompletableFuture future1 = new CompletableFuture();
+        CompletableFuture future2 = new CompletableFuture();
+        try {
+            pulsar1.getBrokerService().getTopics().put(topicName.toString(), future1);
+            pulsar2.getBrokerService().getTopics().put(topicName.toString(), future2);
+            CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() -> {
+                future1.completeExceptionally(new CompletionException(
+                        new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
+                future2.completeExceptionally(new CompletionException(
+                        new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
+            });
+            admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());
+        } finally {
+            pulsar1.getBrokerService().getTopics().remove(topicName.toString());
+            pulsar2.getBrokerService().getTopics().remove(topicName.toString());
+        }
+    }
+
+
     @Test(timeOut = 30 * 1000)
     public void testUnloadAdminAPI() throws Exception {
         TopicName topicName = TopicName.get(defaultTestNamespace + "/test-unload");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
index 6a2ae1cc562..45b1cd9544f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -94,53 +94,59 @@ public class UnloadManagerTest {
     public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
         UnloadCounter counter = new UnloadCounter();
         UnloadManager manager = new UnloadManager(counter);
+        String dstBroker = "broker-2";
+        String srcBroker = "broker-1";
+        String bundle = "bundle-1";
         var unloadDecision =
-                new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
+                new UnloadDecision(new Unload(srcBroker, bundle), Success, Admin);
         CompletableFuture<Void> future =
                 manager.waitAsync(CompletableFuture.completedFuture(null),
-                        "bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
+                        bundle, unloadDecision, 5, TimeUnit.SECONDS);
         Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
 
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Assigning, "broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Assigning, null, srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Deleted, null, srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Splitting, null, srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Releasing, null, srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 0);
         future.get();
         assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
 
         // Success with Owned state.
         future = manager.waitAsync(CompletableFuture.completedFuture(null),
-                "bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
+                bundle, unloadDecision, 5, TimeUnit.SECONDS);
         inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
 
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 0);
-        future.get();
 
+        future.get();
         assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
     }
 
@@ -158,7 +164,7 @@ public class UnloadManagerTest {
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
         manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT),
+                new ServiceUnitStateData(ServiceUnitState.Owned, null, "broker-1", VERSION_ID_INIT),
                 new IllegalStateException("Failed stage."));
 
         try {