You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2023/11/06 01:33:33 UTC
(pulsar) 03/04: resolved comments
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c0eec1e46edeb46c888fa28f27b199ea7e7a1574
Author: Heesung Sohn <he...@streamnative.io>
AuthorDate: Fri Nov 3 15:08:07 2023 -0700
resolved comments
---
.../extensions/ExtensibleLoadManagerImpl.java | 4 ++++
.../channel/ServiceUnitStateChannelImpl.java | 12 ++++++------
.../pulsar/broker/namespace/OwnedBundle.java | 2 +-
.../pulsar/broker/service/BrokerService.java | 10 +++++-----
.../apache/pulsar/broker/service/ServerCnx.java | 7 +++----
.../org/apache/pulsar/broker/service/Topic.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 10 +++++-----
.../broker/service/persistent/PersistentTopic.java | 22 +++++++++++-----------
.../extensions/ExtensibleLoadManagerImplTest.java | 20 +++++++++++++++-----
.../pulsar/client/impl/ConnectionHandler.java | 2 +-
10 files changed, 52 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 14c81a6a492..67bab9b12ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -267,6 +267,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}
+ public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
+ return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
+ }
+
public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 1fe7b83d77d..cff45b18ec8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -776,7 +776,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
log(null, serviceUnit, data, null);
} else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
- closeServiceUnit(serviceUnit, false), serviceUnit, data)
+ closeServiceUnit(serviceUnit, true), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
@@ -799,11 +799,11 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
if (isTransferCommand(data)) {
next = new ServiceUnitStateData(
Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data));
- unloadFuture = closeServiceUnit(serviceUnit, true);
+ unloadFuture = closeServiceUnit(serviceUnit, false);
} else {
next = new ServiceUnitStateData(
Free, null, data.sourceBroker(), getNextVersionId(data));
- unloadFuture = closeServiceUnit(serviceUnit, false);
+ unloadFuture = closeServiceUnit(serviceUnit, true);
}
stateChangeListeners.notifyOnCompletion(unloadFuture
.thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
@@ -903,13 +903,13 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
}
}
- private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean closeWithoutDisconnectingClients) {
+ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean disconnectClients) {
long startTime = System.nanoTime();
MutableInt unloadedTopics = new MutableInt();
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
- closeWithoutDisconnectingClients,
+ disconnectClients,
true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS)
@@ -918,7 +918,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
return numUnloadedTopics;
})
.whenComplete((__, ex) -> {
- if (!closeWithoutDisconnectingClients) {
+ if (disconnectClients) {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
index a87d45395db..cdedac1136e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
@@ -136,7 +136,7 @@ public class OwnedBundle {
return pulsar.getNamespaceService().getOwnershipCache()
.updateBundleState(this.bundle, false)
.thenCompose(v -> pulsar.getBrokerService().unloadServiceUnit(
- bundle, false, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
+ bundle, true, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
.handle((numUnloadedTopics, ex) -> {
if (ex != null) {
// ignore topic-close failure to unload bundle
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e74801ca455..da556e4422f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2197,10 +2197,10 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
- boolean closeWithoutDisconnectingClients,
+ boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) {
CompletableFuture<Integer> future = unloadServiceUnit(
- serviceUnit, closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect);
+ serviceUnit, disconnectClients, closeWithoutWaitingClientDisconnect);
ScheduledFuture<?> taskTimeout = executor().schedule(() -> {
if (!future.isDone()) {
log.warn("Unloading of {} has timed out", serviceUnit);
@@ -2217,13 +2217,13 @@ public class BrokerService implements Closeable {
* Unload all the topic served by the broker service under the given service unit.
*
* @param serviceUnit
- * @param closeWithoutDisconnectingClients don't disconnect clients
+ * @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect
* and forcefully close managed-ledger
* @return
*/
private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
- boolean closeWithoutDisconnectingClients,
+ boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect) {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
topics.forEach((name, topicFuture) -> {
@@ -2248,7 +2248,7 @@ public class BrokerService implements Closeable {
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(
- closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect)
+ disconnectClients, closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
}
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4a43649e902..e0523846423 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1762,11 +1762,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
printSendCommandDebug(send, headersAndPayload);
}
-
- ServiceConfiguration conf = getBrokerService().pulsar().getConfiguration();
+ PulsarService pulsar = getBrokerService().pulsar();
if (producer.getTopic().isFenced()
- && ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(conf)) {
- long ignoredMsgCount = ExtensibleLoadManagerImpl.get(getBrokerService().pulsar())
+ && ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+ long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
.getIgnoredSendMsgCounter().incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Ignored send msg from:{}:{} to fenced topic:{} during unloading."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 41040a9f830..6e2eb75a795 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -197,7 +197,7 @@ public interface Topic {
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
CompletableFuture<Void> close(
- boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect);
+ boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect);
void checkGC();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c299652adf8..0d80de3aa6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -488,19 +488,19 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
- return close(false, closeWithoutWaitingClientDisconnect);
+ return close(true, closeWithoutWaitingClientDisconnect);
}
/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
- * @param closeWithoutDisconnectingClients don't disconnect clients
+ * @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(
- boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
+ boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
lock.writeLock().lock();
@@ -519,7 +519,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
List<CompletableFuture<Void>> futures = new ArrayList<>();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
- if (!closeWithoutDisconnectingClients) {
+ if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData ->
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
@@ -553,7 +553,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
// so, execute it in different thread
brokerService.executor().execute(() -> {
- if (!closeWithoutDisconnectingClients) {
+ if (disconnectClients) {
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 66a8f9da0cc..ba40a75651d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1449,24 +1449,24 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
public CompletableFuture<Void> close() {
- return close(false, false);
+ return close(true, false);
}
@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
- return close(false, closeWithoutWaitingClientDisconnect);
+ return close(true, closeWithoutWaitingClientDisconnect);
}
/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
- * @param closeWithoutDisconnectingClients don't disconnect clients
+ * @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(
- boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
+ boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
lock.writeLock().lock();
@@ -1489,7 +1489,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
- if (!closeWithoutDisconnectingClients) {
+ if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData ->
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
@@ -1531,21 +1531,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
- if (closeWithoutDisconnectingClients) {
- closeFuture.complete(null);
- } else {
+ if (disconnectClients) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
+ } else {
+ closeFuture.complete(null);
}
}
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
- if (closeWithoutDisconnectingClients) {
- closeFuture.complete(null);
- } else {
+ if (disconnectClients) {
disposeTopic(closeFuture);
+ } else {
+ closeFuture.complete(null);
}
}
}, null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index b8873ebb6bb..158488bc84a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -662,9 +662,11 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
@Test
public void testMoreThenOneFilter() throws Exception {
- TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception");
+ // Use a different namespace to avoid flaky test failures
+ // from unloading the default namespace and the following topic policy lookups at the init state step
+ String namespace = "public/my-namespace";
+ TopicName topicName = TopicName.get(namespace + "/test-filter-has-exception");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
-
String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
doReturn(List.of(new MockBrokerFilter() {
@Override
@@ -682,10 +684,18 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
return FutureUtil.failedFuture(new BrokerFilterException("Test"));
}
})).when(primaryLoadManager).getBrokerFilterPipeline();
-
+ admin.namespaces().createNamespace(namespace);
Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
- assertTrue(brokerLookupData.isPresent());
- assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
+ Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertTrue(brokerLookupData.isPresent());
+ assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
+ assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
+ pulsar1.getAdminClient().lookups().lookupTopic(topicName.toString()));
+ assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
+ pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString()));
+ });
+
+ admin.namespaces().deleteNamespace(namespace, true);
}
@Test
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 7361c27155c..600dc17a1b0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -167,7 +167,7 @@ public class ConnectionHandler {
}
public void connectionClosed(ClientCnx cnx) {
- connectionClosed(cnx, null, Optional.empty());
+ connectionClosed(cnx, Optional.empty(), Optional.empty());
}
public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDelayMs, Optional<URI> hostUrl) {