You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/12 17:39:11 UTC
[pulsar] branch master updated: [pulsar-broker] Close RateLimiter
instance (#5155)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b687fed [pulsar-broker] Close RateLimiter instance (#5155)
b687fed is described below
commit b687fed530bc61aae12d3335b7e6ec4f68506ca3
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Fri Sep 13 02:39:03 2019 +0900
[pulsar-broker] Close RateLimiter instance (#5155)
* Close RateLimiter instance
* Restore AbstractDispatcherSingleActiveConsumer#close()
* Fix NonPersistentSubscription#delete()
---
.../nonpersistent/NonPersistentSubscription.java | 26 +++++++++---
.../PersistentDispatcherMultipleConsumers.java | 5 +++
.../PersistentDispatcherSingleActiveConsumer.java | 10 +++++
.../service/persistent/PersistentReplicator.java | 29 +++++++++++++
.../service/persistent/PersistentSubscription.java | 22 ++++++++--
.../broker/service/persistent/PersistentTopic.java | 25 ++++++++----
.../client/api/MessageDispatchThrottlingTest.java | 46 +++++++++++++++++++++
.../SubscriptionMessageDispatchThrottlingTest.java | 47 ++++++++++++++++++++++
8 files changed, 193 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a96e6a3..237aaee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -249,7 +249,9 @@ public class NonPersistentSubscription implements Subscription {
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);
- dispatcher.reset();
+ if (dispatcher != null) {
+ dispatcher.reset();
+ }
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
@@ -272,13 +274,27 @@ public class NonPersistentSubscription implements Subscription {
log.info("[{}][{}] Unsubscribing", topicName, subName);
// cursor close handles pending delete (ack) operations
- this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> deleteFuture.complete(null))
- .exceptionally(exception -> {
+ this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
+ synchronized (this) {
+ (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
+ log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
+ deleteFuture.complete(null);
+ }).exceptionally(ex -> {
IS_FENCED_UPDATER.set(this, FALSE);
- log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
- deleteFuture.completeExceptionally(exception);
+ if (dispatcher != null) {
+ dispatcher.reset();
+ }
+ log.error("[{}][{}] Error deleting subscription", topicName, subName, ex);
+ deleteFuture.completeExceptionally(ex);
return null;
});
+ }
+ }).exceptionally(exception -> {
+ IS_FENCED_UPDATER.set(this, FALSE);
+ log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
+ deleteFuture.completeExceptionally(exception);
+ return null;
+ });
return deleteFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 91e4ab7..1019413 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -370,6 +370,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
if (delayedDeliveryTracker.isPresent()) {
delayedDeliveryTracker.get().close();
}
+
+ if (dispatchRateLimiter.isPresent()) {
+ dispatchRateLimiter.get().close();
+ }
+
return disconnectAllConsumers();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 75d2b49..b227869 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -514,5 +515,14 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
}
+ @Override
+ public CompletableFuture<Void> close() {
+ IS_CLOSED_UPDATER.set(this, TRUE);
+ if (dispatchRateLimiter.isPresent()) {
+ dispatchRateLimiter.get().close();
+ }
+ return disconnectAllConsumers();
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index d33eeff..9cc7942 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -26,6 +26,7 @@ import io.netty.util.Recycler.Handle;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -45,6 +46,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.api.MessageId;
@@ -681,6 +683,33 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
}
@Override
+ public CompletableFuture<Void> disconnect() {
+ return disconnect(false);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+
+ super.disconnect(failIfHasBacklog).thenRun(() -> {
+ if (dispatchRateLimiter.isPresent()) {
+ dispatchRateLimiter.get().close();
+ }
+ future.complete(null);
+ }).exceptionally(ex -> {
+ Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
+ if (t instanceof TopicBusyException == false) {
+ log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", topicName, localCluster,
+ remoteCluster, ex.getMessage());
+ }
+ future.completeExceptionally(t);
+ return null;
+ });
+
+ return future;
+ }
+
+ @Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
return producer != null && producer.isConnected();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4a4e3aa..9243168 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -785,13 +785,27 @@ public class PersistentSubscription implements Subscription {
log.info("[{}][{}] Unsubscribing", topicName, subName);
// cursor close handles pending delete (ack) operations
- this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> deleteFuture.complete(null))
- .exceptionally(exception -> {
+ this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
+ synchronized (this) {
+ (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
+ log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
+ deleteFuture.complete(null);
+ }).exceptionally(ex -> {
IS_FENCED_UPDATER.set(this, FALSE);
- log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
- deleteFuture.completeExceptionally(exception);
+ if (dispatcher != null) {
+ dispatcher.reset();
+ }
+ log.error("[{}][{}] Error deleting subscription", topicName, subName, ex);
+ deleteFuture.completeExceptionally(ex);
return null;
});
+ }
+ }).exceptionally(exception -> {
+ IS_FENCED_UPDATER.set(this, FALSE);
+ log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
+ deleteFuture.completeExceptionally(exception);
+ return null;
+ });
return deleteFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 761e301..6fffe68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -775,6 +775,15 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);
+
+ if (dispatchRateLimiter.isPresent()) {
+ dispatchRateLimiter.get().close();
+ }
+
+ if (subscribeRateLimiter.isPresent()) {
+ subscribeRateLimiter.get().close();
+ }
+
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}
@@ -850,6 +859,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
ctrl.close();
}
+ if (dispatchRateLimiter.isPresent()) {
+ dispatchRateLimiter.get().close();
+ }
+
+ if (subscribeRateLimiter.isPresent()) {
+ subscribeRateLimiter.get().close();
+ }
+
log.info("[{}] Topic closed", topic);
closeFuture.complete(null);
}
@@ -861,14 +878,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
closeFuture.complete(null);
}
}, null);
-
- if (dispatchRateLimiter.isPresent()) {
- dispatchRateLimiter.get().close();
- }
- if (subscribeRateLimiter.isPresent()) {
- subscribeRateLimiter.get().close();
- }
-
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
isFenced = false;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index cc0f46e..6d139f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -847,6 +848,51 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+ @Test(dataProvider = "subscriptions", timeOut = 10000)
+ public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String namespace = "my-property/throttling_ns";
+ final String topicName = "persistent://" + namespace + "/closingRateLimiter" + subscription.name();
+ final String subName = "mySubscription" + subscription.name();
+
+ DispatchRate dispatchRate = new DispatchRate(10, 1024, 1);
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ admin.namespaces().setDispatchRate(namespace, dispatchRate);
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .subscriptionType(subscription).subscribe();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+ final int numProducedMessages = 10;
+
+ for (int i = 0; i < numProducedMessages; i++) {
+ final String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ for (int i = 0; i < numProducedMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ consumer.acknowledge(msg);
+ }
+
+ Assert.assertTrue(topic.getDispatchRateLimiter().isPresent());
+ DispatchRateLimiter dispatchRateLimiter = topic.getDispatchRateLimiter().get();
+
+ producer.close();
+ consumer.unsubscribe();
+ consumer.close();
+ topic.close().get();
+
+ // Make sure that the rate limiter is closed
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater");
statsUpdaterField.setAccessible(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index cbcef08..7ed88f5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.slf4j.Logger;
@@ -550,4 +551,50 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(dataProvider = "subscriptions", timeOut = 10000)
+ public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String namespace = "my-property/throttling_ns";
+ final String topicName = "persistent://" + namespace + "/closingSubRateLimiter" + subscription.name();
+ final String subName = "mySubscription" + subscription.name();
+
+ DispatchRate dispatchRate = new DispatchRate(10, 1024, 1);
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .subscriptionType(subscription).subscribe();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ PersistentSubscription sub = topic.getSubscription(subName);
+
+ final int numProducedMessages = 10;
+
+ for (int i = 0; i < numProducedMessages; i++) {
+ final String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ for (int i = 0; i < numProducedMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ consumer.acknowledge(msg);
+ }
+
+ Dispatcher dispatcher = sub.getDispatcher();
+ Assert.assertTrue(dispatcher.getRateLimiter().isPresent());
+ DispatchRateLimiter dispatchRateLimiter = dispatcher.getRateLimiter().get();
+
+ producer.close();
+ consumer.close();
+ sub.disconnect().get();
+
+ // Make sure that the rate limiter is closed
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
+ Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
}