You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/04/16 07:35:07 UTC

(pulsar) branch master updated: [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496)

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 203f305bf44 [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496)
203f305bf44 is described below

commit 203f305bf449dd335b39501177f210cfcb73d5fa
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Apr 16 00:34:59 2024 -0700

    [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496)
---
 .../channel/ServiceUnitStateChannelImpl.java          | 13 ++++++-------
 .../pulsar/broker/namespace/NamespaceService.java     |  5 +++++
 .../service/nonpersistent/NonPersistentTopic.java     |  3 ++-
 .../broker/service/persistent/PersistentTopic.java    |  3 ++-
 .../pulsar/broker/service/ReplicatorGlobalNSTest.java | 16 ++++++++++++++++
 .../pulsar/broker/service/ReplicatorTestBase.java     | 19 +++++++++++++++++--
 6 files changed, 48 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 68b38080e73..e355187af4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -88,7 +88,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1381,8 +1380,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight non-system bundle override messages.", e);
         }
 
@@ -1405,8 +1404,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight system bundle override messages.", e);
         }
 
@@ -1584,8 +1583,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         }
 
         try {
-            producer.flush();
-        } catch (PulsarClientException e) {
+            producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS);
+        } catch (Exception e) {
             log.error("Failed to flush the in-flight messages.", e);
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 7c62f264c78..3e7bb9560e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -824,6 +824,11 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+            ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
+            return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
+                    .thenApply(Optional::isPresent);
+        }
         return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0ac06d6883f..9a3a0a7d83d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -589,7 +589,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+                || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e4441969101..936091edce5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1704,7 +1704,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> checkReplication() {
         TopicName name = TopicName.get(topic);
-        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
+        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+                || ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             return CompletableFuture.completedFuture(null);
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index 4296f3f4168..eed849ef1a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service;
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -32,6 +34,8 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Method;
@@ -41,6 +45,18 @@ import java.util.concurrent.TimeUnit;
 public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
 
     protected String methodName;
+    @DataProvider(name = "loadManagerClassName")
+    public static Object[][] loadManagerClassName() {
+        return new Object[][]{
+                {ModularLoadManagerImpl.class.getName()},
+                {ExtensibleLoadManagerImpl.class.getName()}
+        };
+    }
+
+    @Factory(dataProvider = "loadManagerClassName")
+    public ReplicatorGlobalNSTest(String loadManagerClassName) {
+        this.loadManagerClassName = loadManagerClassName;
+    }
 
     @BeforeMethod
     public void beforeMethod(Method m) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 11d663ff9f4..ba9f850ff0c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -119,6 +119,11 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
     protected final String cluster2 = "r2";
     protected final String cluster3 = "r3";
     protected final String cluster4 = "r4";
+    protected String loadManagerClassName;
+
+    protected String getLoadManagerClassName() {
+        return loadManagerClassName;
+    }
 
     // Default frequency
     public int getBrokerServicePurgeInactiveFrequency() {
@@ -271,8 +276,9 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
                 .brokerClientTlsTrustStoreType(keyStoreType)
                 .build());
 
-        admin1.tenants().createTenant("pulsar",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
+        updateTenantInfo("pulsar",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
+                        Sets.newHashSet("r1", "r2", "r3")));
         admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
         admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2"));
 
@@ -344,6 +350,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
         config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
         config.setEnableReplicatedSubscriptions(true);
         config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+        config.setLoadManagerClassName(getLoadManagerClassName());
     }
 
     public void resetConfig1() {
@@ -409,6 +416,14 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
         resetConfig4();
     }
 
+    protected void updateTenantInfo(String tenant, TenantInfoImpl tenantInfo) throws Exception {
+        if (!admin1.tenants().getTenants().contains(tenant)) {
+            admin1.tenants().createTenant(tenant, tenantInfo);
+        } else {
+            admin1.tenants().updateTenant(tenant, tenantInfo);
+        }
+    }
+
     static class MessageProducer implements AutoCloseable {
         URL url;
         String namespace;