You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/04/16 07:35:07 UTC
(pulsar) branch master updated: [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 203f305bf44 [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496)
203f305bf44 is described below
commit 203f305bf449dd335b39501177f210cfcb73d5fa
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Apr 16 00:34:59 2024 -0700
[fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496)
---
.../channel/ServiceUnitStateChannelImpl.java | 13 ++++++-------
.../pulsar/broker/namespace/NamespaceService.java | 5 +++++
.../service/nonpersistent/NonPersistentTopic.java | 3 ++-
.../broker/service/persistent/PersistentTopic.java | 3 ++-
.../pulsar/broker/service/ReplicatorGlobalNSTest.java | 16 ++++++++++++++++
.../pulsar/broker/service/ReplicatorTestBase.java | 19 +++++++++++++++++--
6 files changed, 48 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 68b38080e73..e355187af4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -88,7 +88,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1381,8 +1380,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
try {
- producer.flush();
- } catch (PulsarClientException e) {
+ producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
+ } catch (Exception e) {
log.error("Failed to flush the in-flight non-system bundle override messages.", e);
}
@@ -1405,8 +1404,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
try {
- producer.flush();
- } catch (PulsarClientException e) {
+ producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
+ } catch (Exception e) {
log.error("Failed to flush the in-flight system bundle override messages.", e);
}
@@ -1584,8 +1583,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
try {
- producer.flush();
- } catch (PulsarClientException e) {
+ producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
+ } catch (Exception e) {
log.error("Failed to flush the in-flight messages.", e);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 7c62f264c78..3e7bb9560e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -824,6 +824,11 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+ ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
+ return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
+ .thenApply(Optional::isPresent);
+ }
return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0ac06d6883f..9a3a0a7d83d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -589,7 +589,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
- if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+ if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+ || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e4441969101..936091edce5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1704,7 +1704,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
- if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+ if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+ || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index 4296f3f4168..eed849ef1a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -32,6 +34,8 @@ import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import java.lang.reflect.Method;
@@ -41,6 +45,18 @@ import java.util.concurrent.TimeUnit;
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
protected String methodName;
+ @DataProvider(name = "loadManagerClassName")
+ public static Object[][] loadManagerClassName() {
+ return new Object[][]{
+ {ModularLoadManagerImpl.class.getName()},
+ {ExtensibleLoadManagerImpl.class.getName()}
+ };
+ }
+
+ @Factory(dataProvider = "loadManagerClassName")
+ public ReplicatorGlobalNSTest(String loadManagerClassName) {
+ this.loadManagerClassName = loadManagerClassName;
+ }
@BeforeMethod
public void beforeMethod(Method m) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 11d663ff9f4..ba9f850ff0c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -119,6 +119,11 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
protected final String cluster2 = "r2";
protected final String cluster3 = "r3";
protected final String cluster4 = "r4";
+ protected String loadManagerClassName;
+
+ protected String getLoadManagerClassName() {
+ return loadManagerClassName;
+ }
// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
@@ -271,8 +276,9 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
.brokerClientTlsTrustStoreType(keyStoreType)
.build());
- admin1.tenants().createTenant("pulsar",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
+ updateTenantInfo("pulsar",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
+ Sets.newHashSet("r1", "r2", "r3")));
admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2"));
@@ -344,6 +350,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+ config.setLoadManagerClassName(getLoadManagerClassName());
}
public void resetConfig1() {
@@ -409,6 +416,14 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
resetConfig4();
}
+ protected void updateTenantInfo(String tenant, TenantInfoImpl tenantInfo) throws Exception {
+ if (!admin1.tenants().getTenants().contains(tenant)) {
+ admin1.tenants().createTenant(tenant, tenantInfo);
+ } else {
+ admin1.tenants().updateTenant(tenant, tenantInfo);
+ }
+ }
+
static class MessageProducer implements AutoCloseable {
URL url;
String namespace;