You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2023/11/06 01:33:33 UTC

(pulsar) 03/04: resolved comments

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c0eec1e46edeb46c888fa28f27b199ea7e7a1574
Author: Heesung Sohn <he...@streamnative.io>
AuthorDate: Fri Nov 3 15:08:07 2023 -0700

    resolved comments
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  4 ++++
 .../channel/ServiceUnitStateChannelImpl.java       | 12 ++++++------
 .../pulsar/broker/namespace/OwnedBundle.java       |  2 +-
 .../pulsar/broker/service/BrokerService.java       | 10 +++++-----
 .../apache/pulsar/broker/service/ServerCnx.java    |  7 +++----
 .../org/apache/pulsar/broker/service/Topic.java    |  2 +-
 .../service/nonpersistent/NonPersistentTopic.java  | 10 +++++-----
 .../broker/service/persistent/PersistentTopic.java | 22 +++++++++++-----------
 .../extensions/ExtensibleLoadManagerImplTest.java  | 20 +++++++++++++++-----
 .../pulsar/client/impl/ConnectionHandler.java      |  2 +-
 10 files changed, 52 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 14c81a6a492..67bab9b12ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -267,6 +267,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
     }
 
+    public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
+        return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
+    }
+
     public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
         if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
             throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 1fe7b83d77d..cff45b18ec8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -776,7 +776,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
             log(null, serviceUnit, data, null);
         } else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) {
             stateChangeListeners.notifyOnCompletion(
-                            closeServiceUnit(serviceUnit, false), serviceUnit, data)
+                            closeServiceUnit(serviceUnit, true), serviceUnit, data)
                     .whenComplete((__, e) -> log(e, serviceUnit, data, null));
         } else {
             stateChangeListeners.notify(serviceUnit, data, null);
@@ -799,11 +799,11 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
             if (isTransferCommand(data)) {
                 next = new ServiceUnitStateData(
                         Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data));
-                unloadFuture = closeServiceUnit(serviceUnit, true);
+                unloadFuture = closeServiceUnit(serviceUnit, false);
             } else {
                 next = new ServiceUnitStateData(
                         Free, null, data.sourceBroker(), getNextVersionId(data));
-                unloadFuture = closeServiceUnit(serviceUnit, false);
+                unloadFuture = closeServiceUnit(serviceUnit, true);
             }
             stateChangeListeners.notifyOnCompletion(unloadFuture
                             .thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
@@ -903,13 +903,13 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
         }
     }
 
-    private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean closeWithoutDisconnectingClients) {
+    private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean disconnectClients) {
         long startTime = System.nanoTime();
         MutableInt unloadedTopics = new MutableInt();
         NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
         return pulsar.getBrokerService().unloadServiceUnit(
                         bundle,
-                        closeWithoutDisconnectingClients,
+                        disconnectClients,
                         true,
                         pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
                         TimeUnit.MILLISECONDS)
@@ -918,7 +918,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
                     return numUnloadedTopics;
                 })
                 .whenComplete((__, ex) -> {
-                    if (!closeWithoutDisconnectingClients) {
+                    if (disconnectClients) {
                         // clean up topics that failed to unload from the broker ownership cache
                         pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
                     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
index a87d45395db..cdedac1136e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
@@ -136,7 +136,7 @@ public class OwnedBundle {
         return pulsar.getNamespaceService().getOwnershipCache()
                 .updateBundleState(this.bundle, false)
                 .thenCompose(v -> pulsar.getBrokerService().unloadServiceUnit(
-                        bundle, false, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
+                        bundle, true, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
                 .handle((numUnloadedTopics, ex) -> {
                     if (ex != null) {
                         // ignore topic-close failure to unload bundle
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e74801ca455..da556e4422f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2197,10 +2197,10 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
-            boolean closeWithoutDisconnectingClients,
+            boolean disconnectClients,
             boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) {
         CompletableFuture<Integer> future = unloadServiceUnit(
-                serviceUnit, closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect);
+                serviceUnit, disconnectClients, closeWithoutWaitingClientDisconnect);
         ScheduledFuture<?> taskTimeout = executor().schedule(() -> {
             if (!future.isDone()) {
                 log.warn("Unloading of {} has timed out", serviceUnit);
@@ -2217,13 +2217,13 @@ public class BrokerService implements Closeable {
      * Unload all the topic served by the broker service under the given service unit.
      *
      * @param serviceUnit
-     * @param closeWithoutDisconnectingClients don't disconnect clients
+     * @param disconnectClients disconnect clients
      * @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect
      *                                           and forcefully close managed-ledger
      * @return
      */
     private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
-                                                         boolean closeWithoutDisconnectingClients,
+                                                         boolean disconnectClients,
                                                          boolean closeWithoutWaitingClientDisconnect) {
         List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
         topics.forEach((name, topicFuture) -> {
@@ -2248,7 +2248,7 @@ public class BrokerService implements Closeable {
                 }
                 closeFutures.add(topicFuture
                         .thenCompose(t -> t.isPresent() ? t.get().close(
-                                closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect)
+                                disconnectClients, closeWithoutWaitingClientDisconnect)
                                 : CompletableFuture.completedFuture(null)));
             }
         });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4a43649e902..e0523846423 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1762,11 +1762,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             printSendCommandDebug(send, headersAndPayload);
         }
 
-
-        ServiceConfiguration conf = getBrokerService().pulsar().getConfiguration();
+        PulsarService pulsar = getBrokerService().pulsar();
         if (producer.getTopic().isFenced()
-                && ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(conf)) {
-            long ignoredMsgCount = ExtensibleLoadManagerImpl.get(getBrokerService().pulsar())
+                && ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+            long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
                     .getIgnoredSendMsgCounter().incrementAndGet();
             if (log.isDebugEnabled()) {
                 log.debug("Ignored send msg from:{}:{} to fenced topic:{} during unloading."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 41040a9f830..6e2eb75a795 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -197,7 +197,7 @@ public interface Topic {
     CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
 
     CompletableFuture<Void> close(
-            boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect);
+            boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect);
 
     void checkGC();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c299652adf8..0d80de3aa6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -488,19 +488,19 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
 
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        return close(false, closeWithoutWaitingClientDisconnect);
+        return close(true, closeWithoutWaitingClientDisconnect);
     }
 
     /**
      * Close this topic - close all producers and subscriptions associated with this topic.
      *
-     * @param closeWithoutDisconnectingClients don't disconnect clients
+     * @param disconnectClients disconnect clients
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(
-            boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
+            boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
@@ -519,7 +519,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
-        if (!closeWithoutDisconnectingClients) {
+        if (disconnectClients) {
             futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
                     brokerService.getPulsar(), topic).thenAccept(lookupData ->
                     producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
@@ -553,7 +553,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
             // so, execute it in different thread
             brokerService.executor().execute(() -> {
 
-                if (!closeWithoutDisconnectingClients) {
+                if (disconnectClients) {
                     brokerService.removeTopicFromCache(NonPersistentTopic.this);
                     unregisterTopicPolicyListener();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 66a8f9da0cc..ba40a75651d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1449,24 +1449,24 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     public CompletableFuture<Void> close() {
-        return close(false, false);
+        return close(true, false);
     }
 
     @Override
     public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
-        return close(false, closeWithoutWaitingClientDisconnect);
+        return close(true, closeWithoutWaitingClientDisconnect);
     }
 
     /**
      * Close this topic - close all producers and subscriptions associated with this topic.
      *
-     * @param closeWithoutDisconnectingClients don't disconnect clients
+     * @param disconnectClients disconnect clients
      * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
     public CompletableFuture<Void> close(
-            boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
+            boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
@@ -1489,7 +1489,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         futures.add(transactionBuffer.closeAsync());
         replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
         shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
-        if (!closeWithoutDisconnectingClients) {
+        if (disconnectClients) {
             futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
                     brokerService.getPulsar(), topic).thenAccept(lookupData ->
                     producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
@@ -1531,21 +1531,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             ledger.asyncClose(new CloseCallback() {
                 @Override
                 public void closeComplete(Object ctx) {
-                    if (closeWithoutDisconnectingClients) {
-                        closeFuture.complete(null);
-                    } else {
+                    if (disconnectClients) {
                         // Everything is now closed, remove the topic from map
                         disposeTopic(closeFuture);
+                    } else {
+                        closeFuture.complete(null);
                     }
                 }
 
                 @Override
                 public void closeFailed(ManagedLedgerException exception, Object ctx) {
                     log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
-                    if (closeWithoutDisconnectingClients) {
-                        closeFuture.complete(null);
-                    } else {
+                    if (disconnectClients) {
                         disposeTopic(closeFuture);
+                    } else {
+                        closeFuture.complete(null);
                     }
                 }
             }, null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index b8873ebb6bb..158488bc84a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -662,9 +662,11 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testMoreThenOneFilter() throws Exception {
-        TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception");
+        // Use a different namespace to avoid flaky test failures
+        // from unloading the default namespace and the following topic policy lookups at the init state step
+        String namespace = "public/my-namespace";
+        TopicName topicName = TopicName.get(namespace + "/test-filter-has-exception");
         NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
-
         String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
         doReturn(List.of(new MockBrokerFilter() {
             @Override
@@ -682,10 +684,18 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
                 return FutureUtil.failedFuture(new BrokerFilterException("Test"));
             }
         })).when(primaryLoadManager).getBrokerFilterPipeline();
-
+        admin.namespaces().createNamespace(namespace);
         Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
-        assertTrue(brokerLookupData.isPresent());
-        assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
+        Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertTrue(brokerLookupData.isPresent());
+            assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
+            assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
+                    pulsar1.getAdminClient().lookups().lookupTopic(topicName.toString()));
+            assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
+                    pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString()));
+        });
+
+        admin.namespaces().deleteNamespace(namespace, true);
     }
 
     @Test
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 7361c27155c..600dc17a1b0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -167,7 +167,7 @@ public class ConnectionHandler {
     }
 
     public void connectionClosed(ClientCnx cnx) {
-        connectionClosed(cnx, null, Optional.empty());
+        connectionClosed(cnx, Optional.empty(), Optional.empty());
     }
 
     public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDelayMs, Optional<URI> hostUrl) {