You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/04/03 02:08:09 UTC
(pulsar) branch branch-3.0 updated: [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) (#22403)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0404d6187ef [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) (#22403)
0404d6187ef is described below
commit 0404d6187ef3c9db1ee2ca80e21058243b45e990
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Apr 2 19:08:04 2024 -0700
[fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) (#22403)
---
.../extensions/manager/UnloadManager.java | 20 +++++++++-
.../pulsar/broker/service/BrokerService.java | 11 +++++-
.../pulsar/broker/web/PulsarWebResource.java | 7 +++-
.../ExtensibleLoadManagerImplBaseTest.java | 4 --
.../extensions/ExtensibleLoadManagerImplTest.java | 27 +++++++++++++
.../extensions/manager/UnloadManagerTest.java | 44 ++++++++++++----------
6 files changed, 86 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index ffdbbc2af42..bf9885b2a25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.manager;
+import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
+import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import java.util.Map;
@@ -25,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
@@ -88,14 +91,27 @@ public class UnloadManager implements StateChangeListener {
@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
- if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+ ServiceUnitState state = ServiceUnitStateData.state(data);
+
+ if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
+ }
+ return;
+ }
+
+ if (t != null) {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
}
this.complete(serviceUnit, t);
return;
}
- ServiceUnitState state = ServiceUnitStateData.state(data);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Handling {} for service unit {}", data, serviceUnit);
+ }
+
switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
default -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fbec9bf413a..bfb289678d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2277,9 +2277,18 @@ public class BrokerService implements Closeable {
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
- : CompletableFuture.completedFuture(null)));
+ : CompletableFuture.completedFuture(null))
+ .exceptionally(e -> {
+ if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException
+ && e.getMessage().contains("Please redo the lookup")) {
+ log.warn("[{}] Topic ownership check failed. Skipping it", topicName);
+ return null;
+ }
+ throw FutureUtil.wrapToCompletionException(e);
+ }));
}
});
+
if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
TransactionMetadataStoreService metadataStoreService =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index c2ac73d39f5..ce5685f5017 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -610,12 +610,17 @@ public abstract class PulsarWebResource {
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+ return nsService.checkOwnershipPresentAsync(nsBundle);
+ }
+
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();
- return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(optionUrl -> optionUrl.isPresent());
+
+ return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
}
protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 3353fbb96f2..c74820e69a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -63,13 +62,10 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ
protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
conf.setForceDeleteNamespaceAllowed(true);
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
- conf.setTopicLevelPoliciesEnabled(true);
return conf;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 87c199676e0..ef6d267b450 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -64,6 +64,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -236,6 +237,32 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase
assertTrue(brokerLookupData.isPresent());
}
+ @Test(timeOut = 30 * 1000)
+ public void testUnloadUponTopicLookupFailure() throws Exception {
+ TopicName topicName =
+ TopicName.get("public/test/testUnloadUponTopicLookupFailure");
+ NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName);
+ primaryLoadManager.assign(Optional.empty(), bundle).get();
+
+ CompletableFuture future1 = new CompletableFuture();
+ CompletableFuture future2 = new CompletableFuture();
+ try {
+ pulsar1.getBrokerService().getTopics().put(topicName.toString(), future1);
+ pulsar2.getBrokerService().getTopics().put(topicName.toString(), future2);
+ CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() -> {
+ future1.completeExceptionally(new CompletionException(
+ new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
+ future2.completeExceptionally(new CompletionException(
+ new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
+ });
+ admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());
+ } finally {
+ pulsar1.getBrokerService().getTopics().remove(topicName.toString());
+ pulsar2.getBrokerService().getTopics().remove(topicName.toString());
+ }
+ }
+
+
@Test(timeOut = 30 * 1000)
public void testUnloadAdminAPI() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-unload");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
index 6a2ae1cc562..45b1cd9544f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -94,53 +94,59 @@ public class UnloadManagerTest {
public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
+ String dstBroker = "broker-2";
+ String srcBroker = "broker-1";
+ String bundle = "bundle-1";
var unloadDecision =
- new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
+ new UnloadDecision(new Unload(srcBroker, bundle), Success, Admin);
CompletableFuture<Void> future =
manager.waitAsync(CompletableFuture.completedFuture(null),
- "bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
+ bundle, unloadDecision, 5, TimeUnit.SECONDS);
Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);
- manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Assigning, "broker-1", VERSION_ID_INIT), null);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Assigning, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
- manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", VERSION_ID_INIT), null);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Deleted, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
- manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-1", VERSION_ID_INIT), null);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Splitting, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
- manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-1", VERSION_ID_INIT), null);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Releasing, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
- manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", VERSION_ID_INIT), null);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
- manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
// Success with Owned state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
- "bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
+ bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
+ assertEquals(inFlightUnloadRequestMap.size(), 1);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);
- manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
- future.get();
+ future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
}
@@ -158,7 +164,7 @@ public class UnloadManagerTest {
assertEquals(inFlightUnloadRequestMap.size(), 1);
manager.handleEvent("bundle-1",
- new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT),
+ new ServiceUnitStateData(ServiceUnitState.Owned, null, "broker-1", VERSION_ID_INIT),
new IllegalStateException("Failed stage."));
try {