You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 16:36:32 UTC

[incubator-pulsar] branch master updated: Delete inactive subscriptions automatically (#1352)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 762036c  Delete inactive subscriptions automatically (#1352)
762036c is described below

commit 762036c36f071d0aca148bf1d333b3df0da66191
Author: yush1ga <y....@gmail.com>
AuthorDate: Wed May 2 01:36:30 2018 +0900

    Delete inactive subscriptions automatically (#1352)
    
    * Delete inactive subscriptions automatically
    
    * Addressed PR comments
    
    * Add subscriptionExpiryCheckIntervalInMinutes
    
    * Add lastActive for ManagedCursorInfo
---
 conf/broker.conf                                   |   7 +
 conf/standalone.conf                               |   7 +
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  13 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  26 +++-
 .../bookkeeper/mledger/proto/MLDataFormats.java    |  65 +++++++-
 managed-ledger/src/main/proto/MLDataFormats.proto  |   2 +
 .../mledger/impl/ManagedCursorContainerTest.java   |  10 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  21 +++
 .../broker/admin/v1/NonPersistentTopics.java       |  10 +-
 .../broker/admin/v2/NonPersistentTopics.java       |  10 +-
 .../pulsar/broker/service/BrokerService.java       | 166 +++++++++------------
 .../org/apache/pulsar/broker/service/Topic.java    |   2 +
 .../service/nonpersistent/NonPersistentTopic.java  |   5 +
 .../service/persistent/PersistentSubscription.java |   2 +
 .../broker/service/persistent/PersistentTopic.java |  13 ++
 .../broker/stats/BookieClientStatsGenerator.java   |  15 +-
 16 files changed, 252 insertions(+), 122 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index dc7ca04..893a3e2 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -76,6 +76,13 @@ messageExpiryCheckIntervalInMinutes=5
 # How long to delay rewinding cursor and dispatching messages when active consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
+# How long to delete inactive subscriptions from last consuming
+# When it is 0, inactive subscriptions are not deleted automatically
+subscriptionExpirationTimeMinutes=0
+
+# How frequently to proactively check and purge expired subscription
+subscriptionExpiryCheckIntervalInMinutes=5
+
 # Set the default behavior for message deduplication in the broker
 # This can be overridden per-namespace. If enabled, broker will reject
 # messages that were already stored in the topic
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f5c9546..bc6dc10 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -69,6 +69,13 @@ messageExpiryCheckIntervalInMinutes=5
 # How long to delay rewinding cursor and dispatching messages when active consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
+# How long to delete inactive subscriptions from last consuming
+# When it is 0, inactive subscriptions are not deleted automatically
+subscriptionExpirationTimeMinutes=0
+
+# How frequently to proactively check and purge expired subscription
+subscriptionExpiryCheckIntervalInMinutes=5
+
 # Set the default behavior for message deduplication in the broker
 # This can be overridden per-namespace. If enabled, broker will reject
 # messages that were already stored in the topic
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index f6793b4..186a450 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -58,6 +58,19 @@ public interface ManagedCursor {
     String getName();
 
     /**
+     * Get the last active time of the cursor.
+     *
+     * @return the last active time of the cursor
+     */
+    long getLastActive();
+
+    /**
+     * Update the last active time of the cursor
+     *
+     */
+    void updateLastActive();
+
+    /**
      * Return any properties that were associated with the last stored position.
      */
     Map<String, Long> getProperties();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 695f22d..194e8c0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -158,6 +158,9 @@ public class ManagedCursorImpl implements ManagedCursor {
     private long lastLedgerSwitchTimestamp;
     private final Clock clock;
 
+    // The last active time (Unix time, milliseconds) of the cursor
+    private long lastActive;
+
     enum State {
         Uninitialized, // Cursor is being initialized
         NoLedger, // There is no metadata ledger open for writing
@@ -189,6 +192,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
         WAITING_READ_OP_UPDATER.set(this, null);
         this.clock = config.getClock();
+        this.lastActive = this.clock.millis();
         this.lastLedgerSwitchTimestamp = this.clock.millis();
 
         if (config.getThrottleMarkDelete() > 0.0) {
@@ -216,6 +220,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
 
                 cursorLedgerStat = stat;
+                lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
 
                 if (info.getCursorsLedgerId() == -1L) {
                     // There is no cursor ledger to read the last position from. It means the cursor has been properly
@@ -1280,7 +1285,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // markDelete-position and clear out deletedMsgSet
         markDeletePosition = PositionImpl.get(newMarkDeletePosition);
         individualDeletedMessages.remove(Range.atMost(markDeletePosition));
-        
+
         if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
             // If the position that is mark-deleted is past the read position, it
             // means that the client has skipped some entries. We need to move
@@ -1307,7 +1312,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             final MarkDeleteCallback callback, final Object ctx) {
         checkNotNull(position);
         checkArgument(position instanceof PositionImpl);
-        
+
         if (STATE_UPDATER.get(this) == State.Closed) {
             callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
             return;
@@ -1328,7 +1333,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
         }
         PositionImpl newPosition = (PositionImpl) position;
-        
+
         if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
             if (log.isDebugEnabled()) {
                 log.debug(
@@ -1541,7 +1546,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             for (Position pos : positions) {
                 PositionImpl position  = (PositionImpl) checkNotNull(pos);
-                
+
                 if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
                     if (log.isDebugEnabled()) {
                         log.debug(
@@ -1693,6 +1698,16 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
+    public long getLastActive() {
+        return lastActive;
+    }
+
+    @Override
+    public void updateLastActive() {
+        lastActive = System.currentTimeMillis();
+    }
+
+    @Override
     public boolean isDurable() {
         return true;
     }
@@ -1837,7 +1852,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
                 .setCursorsLedgerId(cursorsLedgerId) //
                 .setMarkDeleteLedgerId(position.getLedgerId()) //
-                .setMarkDeleteEntryId(position.getEntryId()); //
+                .setMarkDeleteEntryId(position.getEntryId()) //
+                .setLastActive(lastActive); //
 
         info.addAllProperties(buildPropertiesMap(properties));
         if (persistIndividualDeletedMessageRanges) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
index 73ba1da..fc607b9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
@@ -4428,6 +4428,10 @@ public final class MLDataFormats {
         getPropertiesOrBuilderList();
     org.apache.bookkeeper.mledger.proto.MLDataFormats.LongPropertyOrBuilder getPropertiesOrBuilder(
         int index);
+    
+    // optional int64 lastActive = 6;
+    boolean hasLastActive();
+    long getLastActive();
   }
   public static final class ManagedCursorInfo extends
       com.google.protobuf.GeneratedMessage
@@ -4530,12 +4534,23 @@ public final class MLDataFormats {
       return properties_.get(index);
     }
     
+    // optional int64 lastActive = 6;
+    public static final int LASTACTIVE_FIELD_NUMBER = 6;
+    private long lastActive_;
+    public boolean hasLastActive() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public long getLastActive() {
+      return lastActive_;
+    }
+    
     private void initFields() {
       cursorsLedgerId_ = 0L;
       markDeleteLedgerId_ = 0L;
       markDeleteEntryId_ = 0L;
       individualDeletedMessages_ = java.util.Collections.emptyList();
       properties_ = java.util.Collections.emptyList();
+      lastActive_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4580,6 +4595,9 @@ public final class MLDataFormats {
       for (int i = 0; i < properties_.size(); i++) {
         output.writeMessage(5, properties_.get(i));
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt64(6, lastActive_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -4609,6 +4627,10 @@ public final class MLDataFormats {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, properties_.get(i));
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(6, lastActive_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4753,6 +4775,8 @@ public final class MLDataFormats {
         } else {
           propertiesBuilder_.clear();
         }
+        lastActive_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -4821,6 +4845,10 @@ public final class MLDataFormats {
         } else {
           result.properties_ = propertiesBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.lastActive_ = lastActive_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4898,6 +4926,9 @@ public final class MLDataFormats {
             }
           }
         }
+        if (other.hasLastActive()) {
+          setLastActive(other.getLastActive());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -4972,6 +5003,11 @@ public final class MLDataFormats {
               addProperties(subBuilder.buildPartial());
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              lastActive_ = input.readInt64();
+              break;
+            }
           }
         }
       }
@@ -5413,6 +5449,27 @@ public final class MLDataFormats {
         return propertiesBuilder_;
       }
       
+      // optional int64 lastActive = 6;
+      private long lastActive_ ;
+      public boolean hasLastActive() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public long getLastActive() {
+        return lastActive_;
+      }
+      public Builder setLastActive(long value) {
+        bitField0_ |= 0x00000020;
+        lastActive_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearLastActive() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        lastActive_ = 0L;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:ManagedCursorInfo)
     }
     
@@ -5491,13 +5548,13 @@ public final class MLDataFormats {
       "Range\022*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPos" +
       "itionInfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.Neste" +
       "dPositionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 " +
-      "\002(\t\022\r\n\005value\030\002 \002(\003\"\270\001\n\021ManagedCursorInfo" +
+      "\002(\t\022\r\n\005value\030\002 \002(\003\"\314\001\n\021ManagedCursorInfo" +
       "\022\027\n\017cursorsLedgerId\030\001 \002(\003\022\032\n\022markDeleteL",
       "edgerId\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 \001(\003" +
       "\0220\n\031individualDeletedMessages\030\004 \003(\0132\r.Me" +
       "ssageRange\022!\n\nproperties\030\005 \003(\0132\r.LongPro" +
-      "pertyB\'\n#org.apache.bookkeeper.mledger.p" +
-      "rotoH\001"
+      "perty\022\022\n\nlastActive\030\006 \001(\003B\'\n#org.apache." +
+      "bookkeeper.mledger.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5565,7 +5622,7 @@ public final class MLDataFormats {
           internal_static_ManagedCursorInfo_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ManagedCursorInfo_descriptor,
-              new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", },
+              new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", "LastActive", },
               org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.class,
               org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.Builder.class);
           return null;
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 29f465c..0d5ad3a 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -86,4 +86,6 @@ message ManagedCursorInfo {
 	// Additional custom properties associated with
 	// the current cursor position
 	repeated LongProperty properties = 5;
+
+  optional int64 lastActive = 6;
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index de98b60..c9021ae 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -122,6 +122,16 @@ public class ManagedCursorContainerTest {
             return name;
         }
 
+        @Override
+        public long getLastActive() {
+            return System.currentTimeMillis();
+        }
+
+        @Override
+        public void updateLastActive() {
+            // no-op
+        }
+
         public String toString() {
             return String.format("%s=%s", name, position);
         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5cf31c1..9a69fc3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -93,6 +93,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private int messageExpiryCheckIntervalInMinutes = 5;
     // How long to delay rewinding cursor and dispatching messages when active consumer is changed
     private int activeConsumerFailoverDelayTimeMillis = 1000;
+    // How long to delete inactive subscriptions from last consuming
+    // When it is 0, inactive subscriptions are not deleted automatically
+    private long subscriptionExpirationTimeMinutes = 0;
+    // How frequently to proactively check and purge expired subscription
+    private long subscriptionExpiryCheckIntervalInMinutes = 5;
 
     // Set the default behavior for message deduplication in the broker
     // This can be overridden per-namespace. If enabled, broker will reject
@@ -681,6 +686,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
         this.activeConsumerFailoverDelayTimeMillis = activeConsumerFailoverDelayTimeMillis;
     }
 
+    public long getSubscriptionExpirationTimeMinutes() {
+        return subscriptionExpirationTimeMinutes;
+    }
+
+    public void setSubscriptionExpirationTimeMinutes(long subscriptionExpirationTimeMinutes) {
+        this.subscriptionExpirationTimeMinutes = subscriptionExpirationTimeMinutes;
+    }
+
+    public long getSubscriptionExpiryCheckIntervalInMinutes() {
+        return subscriptionExpiryCheckIntervalInMinutes;
+    }
+
+    public void setSubscriptionExpiryCheckIntervalInMinutes(long subscriptionExpiryCheckIntervalInMinutes) {
+        this.subscriptionExpiryCheckIntervalInMinutes = subscriptionExpiryCheckIntervalInMinutes;
+    }
+
     public boolean isClientLibraryVersionCheckEnabled() {
         return clientLibraryVersionCheckEnabled;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index e9fbe0a..f73a829 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -236,12 +236,10 @@ public class NonPersistentTopics extends PersistentTopics {
         NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
         try {
             final List<String> topicList = Lists.newArrayList();
-            pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
-                if (BrokerService.extractTopic(topicFuture).isPresent()) {
-                    TopicName topicName = TopicName.get(name);
-                    if (nsBundle.includes(topicName)) {
-                        topicList.add(name);
-                    }
+            pulsar().getBrokerService().forEachTopic(topic -> {
+                TopicName topicName = TopicName.get(topic.getName());
+                if (nsBundle.includes(topicName)) {
+                    topicList.add(topic.getName());
                 }
             });
             return topicList;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 8f616d2..9d49ad1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -227,12 +227,10 @@ public class NonPersistentTopics extends PersistentTopics {
         NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, true, true);
         try {
             final List<String> topicList = Lists.newArrayList();
-            pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
-                if (BrokerService.extractTopic(topicFuture).isPresent()) {
-                    TopicName topicName = TopicName.get(name);
-                    if (nsBundle.includes(topicName)) {
-                        topicList.add(name);
-                    }
+            pulsar().getBrokerService().forEachTopic(topic -> {
+                TopicName topicName = TopicName.get(topic.getName());
+                if (nsBundle.includes(topicName)) {
+                    topicList.add(topic.getName());
                 }
             });
             return topicList;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ad709db..43f5000 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -109,9 +109,9 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -324,10 +324,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
 
         // Deduplication info checker
-        long intervalInSeconds = TimeUnit.MINUTES
+        long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES
                 .toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3;
-        inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), intervalInSeconds,
-                intervalInSeconds, TimeUnit.SECONDS);
+        inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), duplicationCheckerIntervalInSeconds,
+                duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
+
+        // Inactive subscriber checker
+        if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes() > 0) {
+            long subscriptionExpiryCheckIntervalInSeconds =
+                    TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
+            inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
+                    subscriptionExpiryCheckIntervalInSeconds, subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS);
+        }
     }
 
     void startMessageExpiryMonitor() {
@@ -813,28 +821,29 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     public void checkGC(int gcIntervalInSeconds) {
-        topics.forEach((n, t) -> {
-            Optional<Topic> topic = extractTopic(t);
-            if (topic.isPresent()) {
-                topic.get().checkGC(gcIntervalInSeconds);
-            }
-        });
+        forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds));
     }
 
     public void checkMessageExpiry() {
-        topics.forEach((n, t) -> {
-            Optional<Topic> topic = extractTopic(t);
-            if (topic.isPresent()) {
-                topic.get().checkMessageExpiry();
-            }
-        });
+        forEachTopic(Topic::checkMessageExpiry);
     }
 
     public void checkMessageDeduplicationInfo() {
+        forEachTopic(Topic::checkMessageDeduplicationInfo);
+    }
+
+    public void checkInactiveSubscriptions() {
+        forEachTopic(Topic::checkInactiveSubscriptions);
+    }
+
+    /**
+     * Iterates over all loaded topics in the broker
+     */
+    public void forEachTopic(Consumer<Topic> consumer) {
         topics.forEach((n, t) -> {
             Optional<Topic> topic = extractTopic(t);
             if (topic.isPresent()) {
-                topic.get().checkMessageDeduplicationInfo();
+                consumer.accept(topic.get());
             }
         });
     }
@@ -866,28 +875,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     public void monitorBacklogQuota() {
-        topics.forEach((n, t) -> {
-            try {
-                Optional<Topic> optionalTopic = extractTopic(t);
-                if (optionalTopic.isPresent() && optionalTopic.get() instanceof PersistentTopic) {
-                    PersistentTopic topic = (PersistentTopic) optionalTopic.get();
-                    if (isBacklogExceeded(topic)) {
-                        getBacklogQuotaManager().handleExceededBacklogQuota(topic);
-                    } else if (topic == null) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("topic is null ");
-                        }
-                    } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("quota not exceeded for [{}]", topic.getName());
-                        }
+        forEachTopic(topic -> {
+            if (topic instanceof PersistentTopic) {
+                PersistentTopic persistentTopic = (PersistentTopic) topic;
+                if (isBacklogExceeded(persistentTopic)) {
+                    getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("quota not exceeded for [{}]", topic.getName());
                     }
                 }
-            } catch (Exception xle) {
-                log.warn("Backlog quota monitoring encountered :" + xle.getLocalizedMessage());
             }
         });
-
     }
 
     void checkTopicNsOwnership(final String topic) throws RuntimeException {
@@ -1035,12 +1034,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
     public Map<String, TopicStats> getTopicStats() {
         HashMap<String, TopicStats> stats = new HashMap<>();
-        topics.forEach((name, topicFuture) -> {
-            Optional<Topic> topic = extractTopic(topicFuture);
-            if (topic.isPresent()) {
-                stats.put(name, topic.get().getStats());
-            }
-        });
+
+        forEachTopic(topic -> stats.put(topic.getName(), topic.getStats()));
+
         return stats;
     }
 
@@ -1130,11 +1126,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private void updateTopicMessageDispatchRate() {
         this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
-            topics.forEach((name, topicFuture) -> {
-                Optional<Topic> topic = extractTopic(topicFuture);
-
-                if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
-                    PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+            forEachTopic(topic -> {
+                if (topic instanceof PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
                     // it first checks namespace-policy rate and if not present then applies broker-config
                     persistentTopic.getDispatchRateLimiter().updateDispatchRate();
                 }
@@ -1145,21 +1139,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private void updateSubscriptionMessageDispatchRate() {
         this.pulsar().getExecutor().submit(() -> {
             // update message-rate for each topic subscription
-            topics.forEach((name, topicFuture) -> {
-                Optional<Topic> topic = extractTopic(topicFuture);
-
-                if (topic.isPresent()) {
-                    topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
-                        if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
-                            ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher())
-                                    .getDispatchRateLimiter().updateDispatchRate();
-                        } else if (persistentSubscription
-                                .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
-                            ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher())
-                                    .getDispatchRateLimiter().updateDispatchRate();
-                        }
-                    });
-                }
+            forEachTopic(topic -> {
+                topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
+                    if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
+                        ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher())
+                                .getDispatchRateLimiter().updateDispatchRate();
+                    } else if (persistentSubscription
+                            .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
+                        ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher())
+                                .getDispatchRateLimiter().updateDispatchRate();
+                    }
+                });
             });
         });
     }
@@ -1167,22 +1157,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private void updateManagedLedgerConfig() {
         this.pulsar().getExecutor().execute(() -> {
             // update managed-ledger config of each topic
-            topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    String topicName = null;
-                    try {
-                        Optional<Topic> topic = extractTopic(topicFuture);
-
-                        if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
-                            PersistentTopic persistentTopic = (PersistentTopic) topic.get();
-                            topicName = persistentTopic.getName();
-                            // update skipNonRecoverableLedger configuration
-                            persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
-                                    pulsar.getConfiguration().isAutoSkipNonRecoverableData());
-                        }
-                    } catch (Exception e) {
-                        log.warn("[{}] failed to update managed-ledger config", topicName, e);
+
+            forEachTopic(topic -> {
+                try {
+                    if (topic instanceof PersistentTopic) {
+                        PersistentTopic persistentTopic = (PersistentTopic) topic;
+                        // update skipNonRecoverableLedger configuration
+                        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+                                pulsar.getConfiguration().isAutoSkipNonRecoverableData());
                     }
+                } catch (Exception e) {
+                    log.warn("[{}] failed to update managed-ledger config", topic.getName(), e);
                 }
             });
         });
@@ -1437,23 +1422,20 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private void blockDispatchersWithLargeUnAckMessages() {
         lock.readLock().lock();
         try {
-            topics.forEach((name, topicFuture) -> {
-                Optional<Topic> topic = extractTopic(topicFuture);
-                if (topic.isPresent()) {
-                    topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
-                        if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
-                            PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
-                                    .getDispatcher();
-                            int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
-                            if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
-                                log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
-                                        dispatcher.getName(), dispatcher.getTotalUnackedMessages());
-                                dispatcher.blockDispatcherOnUnackedMsgs();
-                                blockedDispatchers.add(dispatcher);
-                            }
+            forEachTopic(topic -> {
+                topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
+                    if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
+                        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
+                                .getDispatcher();
+                        int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
+                        if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
+                            log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
+                                    dispatcher.getName(), dispatcher.getTotalUnackedMessages());
+                            dispatcher.blockDispatcherOnUnackedMsgs();
+                            blockedDispatchers.add(dispatcher);
                         }
-                    });
-                }
+                    }
+                });
             });
         } finally {
             lock.readLock().unlock();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fbf3c65..fccc75b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -101,6 +101,8 @@ public interface Topic {
 
     void checkGC(int gcInterval);
 
+    void checkInactiveSubscriptions();
+
     void checkMessageExpiry();
 
     void checkMessageDeduplicationInfo();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e45016c..cddfea3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -913,6 +913,11 @@ public class NonPersistentTopic implements Topic {
     }
 
     @Override
+    public void checkInactiveSubscriptions() {
+        // no-op
+    }
+
+    @Override
     public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 65f5f97..bdace55 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -96,6 +96,7 @@ public class PersistentSubscription implements Subscription {
 
     @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
+        cursor.updateLastActive();
         if (IS_FENCED_UPDATER.get(this) == TRUE) {
             log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
             throw new SubscriptionFencedException("Subscription is fenced");
@@ -144,6 +145,7 @@ public class PersistentSubscription implements Subscription {
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+        cursor.updateLastActive();
         if (dispatcher != null) {
             dispatcher.removeConsumer(consumer);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c59bd9..d56bf55 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1462,6 +1462,19 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         }
     }
 
+    @Override
+    public void checkInactiveSubscriptions() {
+        final long expirationTime = TimeUnit.MINUTES.toMillis(brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
+        if (expirationTime <= 0) return;
+        subscriptions.forEach((subName, sub) -> {
+            if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) return;
+            if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTime) {
+                sub.delete().thenAccept(
+                        v -> log.info("[{}][{}] The subscription was deleted due to expiration", topic, subName));
+            }
+        });
+    }
+
     /**
      * Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's
      * marked as inactive.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
index 914a25c..697f16a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
@@ -18,18 +18,15 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import com.google.common.collect.Maps;
+
 import java.util.Map;
-import java.util.Optional;
 
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.TopicName;
 
-import com.google.common.collect.Maps;
-
 /**
  */
 public class BookieClientStatsGenerator {
@@ -49,10 +46,10 @@ public class BookieClientStatsGenerator {
 
     private Map<String, Map<String, PendingBookieOpsStats>> generate() throws Exception {
         if (pulsar.getBrokerService() != null && pulsar.getBrokerService().getTopics() != null) {
-            pulsar.getBrokerService().getTopics().forEach((name, topicFuture) -> {
-                Optional<Topic> topic = BrokerService.extractTopic(topicFuture);
-                if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
-                    PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+
+            pulsar.getBrokerService().forEachTopic(topic -> {
+                if (topic instanceof PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
                     TopicName topicName = TopicName.get(persistentTopic.getName());
                     put(topicName, persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats());
                 }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.