You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/12 17:39:11 UTC

[pulsar] branch master updated: [pulsar-broker] Close RateLimiter instance (#5155)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b687fed  [pulsar-broker] Close RateLimiter instance (#5155)
b687fed is described below

commit b687fed530bc61aae12d3335b7e6ec4f68506ca3
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Fri Sep 13 02:39:03 2019 +0900

    [pulsar-broker] Close RateLimiter instance (#5155)
    
    * Close RateLimiter instance
    
    * Restore AbstractDispatcherSingleActiveConsumer#close()
    
    * Fix NonPersistentSubscription#delete()
---
 .../nonpersistent/NonPersistentSubscription.java   | 26 +++++++++---
 .../PersistentDispatcherMultipleConsumers.java     |  5 +++
 .../PersistentDispatcherSingleActiveConsumer.java  | 10 +++++
 .../service/persistent/PersistentReplicator.java   | 29 +++++++++++++
 .../service/persistent/PersistentSubscription.java | 22 ++++++++--
 .../broker/service/persistent/PersistentTopic.java | 25 ++++++++----
 .../client/api/MessageDispatchThrottlingTest.java  | 46 +++++++++++++++++++++
 .../SubscriptionMessageDispatchThrottlingTest.java | 47 ++++++++++++++++++++++
 8 files changed, 193 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a96e6a3..237aaee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -249,7 +249,9 @@ public class NonPersistentSubscription implements Subscription {
                     disconnectFuture.complete(null);
                 }).exceptionally(exception -> {
                     IS_FENCED_UPDATER.set(this, FALSE);
-                    dispatcher.reset();
+                    if (dispatcher != null) {
+                        dispatcher.reset();
+                    }
                     log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
                             exception);
                     disconnectFuture.completeExceptionally(exception);
@@ -272,13 +274,27 @@ public class NonPersistentSubscription implements Subscription {
         log.info("[{}][{}] Unsubscribing", topicName, subName);
 
         // cursor close handles pending delete (ack) operations
-        this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> deleteFuture.complete(null))
-                .exceptionally(exception -> {
+        this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
+            synchronized (this) {
+                (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
+                    log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
+                    deleteFuture.complete(null);
+                }).exceptionally(ex -> {
                     IS_FENCED_UPDATER.set(this, FALSE);
-                    log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
-                    deleteFuture.completeExceptionally(exception);
+                    if (dispatcher != null) {
+                        dispatcher.reset();
+                    }
+                    log.error("[{}][{}] Error deleting subscription", topicName, subName, ex);
+                    deleteFuture.completeExceptionally(ex);
                     return null;
                 });
+            }
+        }).exceptionally(exception -> {
+            IS_FENCED_UPDATER.set(this, FALSE);
+            log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
+            deleteFuture.completeExceptionally(exception);
+            return null;
+        });
 
         return deleteFuture;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 91e4ab7..1019413 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -370,6 +370,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (delayedDeliveryTracker.isPresent()) {
             delayedDeliveryTracker.get().close();
         }
+
+        if (dispatchRateLimiter.isPresent()) {
+            dispatchRateLimiter.get().close();
+        }
+
         return disconnectAllConsumers();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 75d2b49..b227869 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
 
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -514,5 +515,14 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         }
     }
 
+    @Override
+    public CompletableFuture<Void> close() {
+        IS_CLOSED_UPDATER.set(this, TRUE);
+        if (dispatchRateLimiter.isPresent()) {
+            dispatchRateLimiter.get().close();
+        }
+        return disconnectAllConsumers();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index d33eeff..9cc7942 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -26,6 +26,7 @@ import io.netty.util.Recycler.Handle;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -45,6 +46,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.client.api.MessageId;
@@ -681,6 +683,33 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
     }
 
     @Override
+    public CompletableFuture<Void> disconnect() {
+        return disconnect(false);
+    }
+
+    @Override
+    public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
+        super.disconnect(failIfHasBacklog).thenRun(() -> {
+            if (dispatchRateLimiter.isPresent()) {
+                dispatchRateLimiter.get().close();
+            }
+            future.complete(null);
+        }).exceptionally(ex -> {
+            Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
+            if (t instanceof TopicBusyException == false) {
+                log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", topicName, localCluster,
+                        remoteCluster, ex.getMessage());
+            }
+            future.completeExceptionally(t);
+            return null;
+        });
+
+        return future;
+    }
+
+    @Override
     public boolean isConnected() {
         ProducerImpl<?> producer = this.producer;
         return producer != null && producer.isConnected();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4a4e3aa..9243168 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -785,13 +785,27 @@ public class PersistentSubscription implements Subscription {
         log.info("[{}][{}] Unsubscribing", topicName, subName);
 
         // cursor close handles pending delete (ack) operations
-        this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> deleteFuture.complete(null))
-                .exceptionally(exception -> {
+        this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
+            synchronized (this) {
+                (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
+                    log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
+                    deleteFuture.complete(null);
+                }).exceptionally(ex -> {
                     IS_FENCED_UPDATER.set(this, FALSE);
-                    log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
-                    deleteFuture.completeExceptionally(exception);
+                    if (dispatcher != null) {
+                        dispatcher.reset();
+                    }
+                    log.error("[{}][{}] Error deleting subscription", topicName, subName, ex);
+                    deleteFuture.completeExceptionally(ex);
                     return null;
                 });
+            }
+        }).exceptionally(exception -> {
+            IS_FENCED_UPDATER.set(this, FALSE);
+            log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
+            deleteFuture.completeExceptionally(exception);
+            return null;
+        });
 
         return deleteFuture;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 761e301..6fffe68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -775,6 +775,15 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                                 @Override
                                 public void deleteLedgerComplete(Object ctx) {
                                     brokerService.removeTopicFromCache(topic);
+
+                                    if (dispatchRateLimiter.isPresent()) {
+                                        dispatchRateLimiter.get().close();
+                                    }
+
+                                    if (subscribeRateLimiter.isPresent()) {
+                                        subscribeRateLimiter.get().close();
+                                    }
+
                                     log.info("[{}] Topic deleted", topic);
                                     deleteFuture.complete(null);
                                 }
@@ -850,6 +859,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         ctrl.close();
                     }
 
+                    if (dispatchRateLimiter.isPresent()) {
+                        dispatchRateLimiter.get().close();
+                    }
+
+                    if (subscribeRateLimiter.isPresent()) {
+                        subscribeRateLimiter.get().close();
+                    }
+
                     log.info("[{}] Topic closed", topic);
                     closeFuture.complete(null);
                 }
@@ -861,14 +878,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     closeFuture.complete(null);
                 }
             }, null);
-
-            if (dispatchRateLimiter.isPresent()) {
-                dispatchRateLimiter.get().close();
-            }
-            if (subscribeRateLimiter.isPresent()) {
-                subscribeRateLimiter.get().close();
-            }
-
         }).exceptionally(exception -> {
             log.error("[{}] Error closing topic", topic, exception);
             isFenced = false;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index cc0f46e..6d139f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -847,6 +848,51 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "subscriptions", timeOut = 10000)
+    public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace = "my-property/throttling_ns";
+        final String topicName = "persistent://" + namespace + "/closingRateLimiter" + subscription.name();
+        final String subName = "mySubscription" + subscription.name();
+
+        DispatchRate dispatchRate = new DispatchRate(10, 1024, 1);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setDispatchRate(namespace, dispatchRate);
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscriptionType(subscription).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        final int numProducedMessages = 10;
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            Message<byte[]> msg = consumer.receive();
+            consumer.acknowledge(msg);
+        }
+
+        Assert.assertTrue(topic.getDispatchRateLimiter().isPresent());
+        DispatchRateLimiter dispatchRateLimiter = topic.getDispatchRateLimiter().get();
+
+        producer.close();
+        consumer.unsubscribe();
+        consumer.close();
+        topic.close().get();
+
+        // Make sure that the rate limiter is closed
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
     protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
         Field statsUpdaterField = BrokerService.class.getDeclaredField("statsUpdater");
         statsUpdaterField.setAccessible(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index cbcef08..7ed88f5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.slf4j.Logger;
@@ -550,4 +551,50 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
 
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test(dataProvider = "subscriptions", timeOut = 10000)
+    public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace = "my-property/throttling_ns";
+        final String topicName = "persistent://" + namespace + "/closingSubRateLimiter" + subscription.name();
+        final String subName = "mySubscription" + subscription.name();
+
+        DispatchRate dispatchRate = new DispatchRate(10, 1024, 1);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .subscriptionType(subscription).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        PersistentSubscription sub = topic.getSubscription(subName);
+
+        final int numProducedMessages = 10;
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        for (int i = 0; i < numProducedMessages; i++) {
+            Message<byte[]> msg = consumer.receive();
+            consumer.acknowledge(msg);
+        }
+
+        Dispatcher dispatcher = sub.getDispatcher();
+        Assert.assertTrue(dispatcher.getRateLimiter().isPresent());
+        DispatchRateLimiter dispatchRateLimiter = dispatcher.getRateLimiter().get();
+
+        producer.close();
+        consumer.close();
+        sub.disconnect().get();
+
+        // Make sure that the rate limiter is closed
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
+        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
 }