You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/01 16:36:32 UTC

[GitHub] merlimat closed pull request #1352: Delete inactive subscriptions automatically

merlimat closed pull request #1352: Delete inactive subscriptions automatically
URL: https://github.com/apache/incubator-pulsar/pull/1352
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 6f502a59c6..3fdce3dd08 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -76,6 +76,13 @@ messageExpiryCheckIntervalInMinutes=5
 # How long to delay rewinding cursor and dispatching messages when active consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
+# How long to delete inactive subscriptions from last consuming
+# When it is 0, inactive subscriptions are not deleted automatically
+subscriptionExpirationTimeMinutes=0
+
+# How frequently to proactively check and purge expired subscription
+subscriptionExpiryCheckIntervalInMinutes=5
+
 # Set the default behavior for message deduplication in the broker
 # This can be overridden per-namespace. If enabled, broker will reject
 # messages that were already stored in the topic
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2beddf50da..9c33b04671 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -69,6 +69,13 @@ messageExpiryCheckIntervalInMinutes=5
 # How long to delay rewinding cursor and dispatching messages when active consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
+# How long to delete inactive subscriptions from last consuming
+# When it is 0, inactive subscriptions are not deleted automatically
+subscriptionExpirationTimeMinutes=0
+
+# How frequently to proactively check and purge expired subscription
+subscriptionExpiryCheckIntervalInMinutes=5
+
 # Set the default behavior for message deduplication in the broker
 # This can be overridden per-namespace. If enabled, broker will reject
 # messages that were already stored in the topic
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index f6793b405e..186a450c87 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -57,6 +57,19 @@
      */
     String getName();
 
+    /**
+     * Get the last active time of the cursor.
+     *
+     * @return the last active time of the cursor
+     */
+    long getLastActive();
+
+    /**
+     * Update the last active time of the cursor
+     *
+     */
+    void updateLastActive();
+
     /**
      * Return any properties that were associated with the last stored position.
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 695f22dcd6..194e8c0e6f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -158,6 +158,9 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
     private long lastLedgerSwitchTimestamp;
     private final Clock clock;
 
+    // The last active time (Unix time, milliseconds) of the cursor
+    private long lastActive;
+
     enum State {
         Uninitialized, // Cursor is being initialized
         NoLedger, // There is no metadata ledger open for writing
@@ -189,6 +192,7 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
         RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
         WAITING_READ_OP_UPDATER.set(this, null);
         this.clock = config.getClock();
+        this.lastActive = this.clock.millis();
         this.lastLedgerSwitchTimestamp = this.clock.millis();
 
         if (config.getThrottleMarkDelete() > 0.0) {
@@ -216,6 +220,7 @@ void recover(final VoidCallback callback) {
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
 
                 cursorLedgerStat = stat;
+                lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
 
                 if (info.getCursorsLedgerId() == -1L) {
                     // There is no cursor ledger to read the last position from. It means the cursor has been properly
@@ -1280,7 +1285,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
         // markDelete-position and clear out deletedMsgSet
         markDeletePosition = PositionImpl.get(newMarkDeletePosition);
         individualDeletedMessages.remove(Range.atMost(markDeletePosition));
-        
+
         if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
             // If the position that is mark-deleted is past the read position, it
             // means that the client has skipped some entries. We need to move
@@ -1307,7 +1312,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
             final MarkDeleteCallback callback, final Object ctx) {
         checkNotNull(position);
         checkArgument(position instanceof PositionImpl);
-        
+
         if (STATE_UPDATER.get(this) == State.Closed) {
             callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
             return;
@@ -1328,7 +1333,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
             log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
         }
         PositionImpl newPosition = (PositionImpl) position;
-        
+
         if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
             if (log.isDebugEnabled()) {
                 log.debug(
@@ -1541,7 +1546,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
 
             for (Position pos : positions) {
                 PositionImpl position  = (PositionImpl) checkNotNull(pos);
-                
+
                 if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
                     if (log.isDebugEnabled()) {
                         log.debug(
@@ -1692,6 +1697,16 @@ public String getName() {
         return name;
     }
 
+    @Override
+    public long getLastActive() {
+        return lastActive;
+    }
+
+    @Override
+    public void updateLastActive() {
+        lastActive = System.currentTimeMillis();
+    }
+
     @Override
     public boolean isDurable() {
         return true;
@@ -1837,7 +1852,8 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
         ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
                 .setCursorsLedgerId(cursorsLedgerId) //
                 .setMarkDeleteLedgerId(position.getLedgerId()) //
-                .setMarkDeleteEntryId(position.getEntryId()); //
+                .setMarkDeleteEntryId(position.getEntryId()) //
+                .setLastActive(lastActive); //
 
         info.addAllProperties(buildPropertiesMap(properties));
         if (persistIndividualDeletedMessageRanges) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
index 73ba1dab56..fc607b944d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
@@ -4428,6 +4428,10 @@ public Builder clearValue() {
         getPropertiesOrBuilderList();
     org.apache.bookkeeper.mledger.proto.MLDataFormats.LongPropertyOrBuilder getPropertiesOrBuilder(
         int index);
+    
+    // optional int64 lastActive = 6;
+    boolean hasLastActive();
+    long getLastActive();
   }
   public static final class ManagedCursorInfo extends
       com.google.protobuf.GeneratedMessage
@@ -4530,12 +4534,23 @@ public int getPropertiesCount() {
       return properties_.get(index);
     }
     
+    // optional int64 lastActive = 6;
+    public static final int LASTACTIVE_FIELD_NUMBER = 6;
+    private long lastActive_;
+    public boolean hasLastActive() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public long getLastActive() {
+      return lastActive_;
+    }
+    
     private void initFields() {
       cursorsLedgerId_ = 0L;
       markDeleteLedgerId_ = 0L;
       markDeleteEntryId_ = 0L;
       individualDeletedMessages_ = java.util.Collections.emptyList();
       properties_ = java.util.Collections.emptyList();
+      lastActive_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4580,6 +4595,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output)
       for (int i = 0; i < properties_.size(); i++) {
         output.writeMessage(5, properties_.get(i));
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt64(6, lastActive_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -4609,6 +4627,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, properties_.get(i));
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(6, lastActive_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4753,6 +4775,8 @@ public Builder clear() {
         } else {
           propertiesBuilder_.clear();
         }
+        lastActive_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -4821,6 +4845,10 @@ public Builder clone() {
         } else {
           result.properties_ = propertiesBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.lastActive_ = lastActive_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4898,6 +4926,9 @@ public Builder mergeFrom(org.apache.bookkeeper.mledger.proto.MLDataFormats.Manag
             }
           }
         }
+        if (other.hasLastActive()) {
+          setLastActive(other.getLastActive());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -4972,6 +5003,11 @@ public Builder mergeFrom(
               addProperties(subBuilder.buildPartial());
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              lastActive_ = input.readInt64();
+              break;
+            }
           }
         }
       }
@@ -5413,6 +5449,27 @@ public Builder removeProperties(int index) {
         return propertiesBuilder_;
       }
       
+      // optional int64 lastActive = 6;
+      private long lastActive_ ;
+      public boolean hasLastActive() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public long getLastActive() {
+        return lastActive_;
+      }
+      public Builder setLastActive(long value) {
+        bitField0_ |= 0x00000020;
+        lastActive_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearLastActive() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        lastActive_ = 0L;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:ManagedCursorInfo)
     }
     
@@ -5491,13 +5548,13 @@ public Builder removeProperties(int index) {
       "Range\022*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPos" +
       "itionInfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.Neste" +
       "dPositionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 " +
-      "\002(\t\022\r\n\005value\030\002 \002(\003\"\270\001\n\021ManagedCursorInfo" +
+      "\002(\t\022\r\n\005value\030\002 \002(\003\"\314\001\n\021ManagedCursorInfo" +
       "\022\027\n\017cursorsLedgerId\030\001 \002(\003\022\032\n\022markDeleteL",
       "edgerId\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 \001(\003" +
       "\0220\n\031individualDeletedMessages\030\004 \003(\0132\r.Me" +
       "ssageRange\022!\n\nproperties\030\005 \003(\0132\r.LongPro" +
-      "pertyB\'\n#org.apache.bookkeeper.mledger.p" +
-      "rotoH\001"
+      "perty\022\022\n\nlastActive\030\006 \001(\003B\'\n#org.apache." +
+      "bookkeeper.mledger.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5565,7 +5622,7 @@ public Builder removeProperties(int index) {
           internal_static_ManagedCursorInfo_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ManagedCursorInfo_descriptor,
-              new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", },
+              new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", "LastActive", },
               org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.class,
               org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.Builder.class);
           return null;
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 29f465c9c2..0d5ad3a17c 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -86,4 +86,6 @@ message ManagedCursorInfo {
 	// Additional custom properties associated with
 	// the current cursor position
 	repeated LongProperty properties = 5;
+
+  optional int64 lastActive = 6;
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index de98b601a0..c9021ae22f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -122,6 +122,16 @@ public String getName() {
             return name;
         }
 
+        @Override
+        public long getLastActive() {
+            return System.currentTimeMillis();
+        }
+
+        @Override
+        public void updateLastActive() {
+            // no-op
+        }
+
         public String toString() {
             return String.format("%s=%s", name, position);
         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e6c37ba162..dcd45eff41 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -89,6 +89,11 @@
     private int messageExpiryCheckIntervalInMinutes = 5;
     // How long to delay rewinding cursor and dispatching messages when active consumer is changed
     private int activeConsumerFailoverDelayTimeMillis = 1000;
+    // How long to delete inactive subscriptions from last consuming
+    // When it is 0, inactive subscriptions are not deleted automatically
+    private long subscriptionExpirationTimeMinutes = 0;
+    // How frequently to proactively check and purge expired subscription
+    private long subscriptionExpiryCheckIntervalInMinutes = 5;
 
     // Set the default behavior for message deduplication in the broker
     // This can be overridden per-namespace. If enabled, broker will reject
@@ -657,6 +662,22 @@ public void setActiveConsumerFailoverDelayTimeMillis(int activeConsumerFailoverD
         this.activeConsumerFailoverDelayTimeMillis = activeConsumerFailoverDelayTimeMillis;
     }
 
+    public long getSubscriptionExpirationTimeMinutes() {
+        return subscriptionExpirationTimeMinutes;
+    }
+
+    public void setSubscriptionExpirationTimeMinutes(long subscriptionExpirationTimeMinutes) {
+        this.subscriptionExpirationTimeMinutes = subscriptionExpirationTimeMinutes;
+    }
+
+    public long getSubscriptionExpiryCheckIntervalInMinutes() {
+        return subscriptionExpiryCheckIntervalInMinutes;
+    }
+
+    public void setSubscriptionExpiryCheckIntervalInMinutes(long subscriptionExpiryCheckIntervalInMinutes) {
+        this.subscriptionExpiryCheckIntervalInMinutes = subscriptionExpiryCheckIntervalInMinutes;
+    }
+
     public boolean isClientLibraryVersionCheckEnabled() {
         return clientLibraryVersionCheckEnabled;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index e9fbe0a0c0..f73a829926 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -236,12 +236,10 @@ public void unloadTopic(@PathParam("property") String property, @PathParam("clus
         NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
         try {
             final List<String> topicList = Lists.newArrayList();
-            pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
-                if (BrokerService.extractTopic(topicFuture).isPresent()) {
-                    TopicName topicName = TopicName.get(name);
-                    if (nsBundle.includes(topicName)) {
-                        topicList.add(name);
-                    }
+            pulsar().getBrokerService().forEachTopic(topic -> {
+                TopicName topicName = TopicName.get(topic.getName());
+                if (nsBundle.includes(topicName)) {
+                    topicList.add(topic.getName());
                 }
             });
             return topicList;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 8f616d25b1..9d49ad1820 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -227,12 +227,10 @@ public void unloadTopic(@PathParam("tenant") String tenant, @PathParam("namespac
         NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, true, true);
         try {
             final List<String> topicList = Lists.newArrayList();
-            pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
-                if (BrokerService.extractTopic(topicFuture).isPresent()) {
-                    TopicName topicName = TopicName.get(name);
-                    if (nsBundle.includes(topicName)) {
-                        topicList.add(name);
-                    }
+            pulsar().getBrokerService().forEachTopic(topic -> {
+                TopicName topicName = TopicName.get(topic.getName());
+                if (nsBundle.includes(topicName)) {
+                    topicList.add(topic.getName());
                 }
             });
             return topicList;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ad709db65a..43f50009f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -109,9 +109,9 @@
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -324,10 +324,18 @@ void startInactivityMonitor() {
         }
 
         // Deduplication info checker
-        long intervalInSeconds = TimeUnit.MINUTES
+        long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES
                 .toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3;
-        inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), intervalInSeconds,
-                intervalInSeconds, TimeUnit.SECONDS);
+        inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), duplicationCheckerIntervalInSeconds,
+                duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
+
+        // Inactive subscriber checker
+        if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes() > 0) {
+            long subscriptionExpiryCheckIntervalInSeconds =
+                    TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
+            inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
+                    subscriptionExpiryCheckIntervalInSeconds, subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS);
+        }
     }
 
     void startMessageExpiryMonitor() {
@@ -813,28 +821,29 @@ public Semaphore getLookupRequestSemaphore() {
     }
 
     public void checkGC(int gcIntervalInSeconds) {
-        topics.forEach((n, t) -> {
-            Optional<Topic> topic = extractTopic(t);
-            if (topic.isPresent()) {
-                topic.get().checkGC(gcIntervalInSeconds);
-            }
-        });
+        forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds));
     }
 
     public void checkMessageExpiry() {
-        topics.forEach((n, t) -> {
-            Optional<Topic> topic = extractTopic(t);
-            if (topic.isPresent()) {
-                topic.get().checkMessageExpiry();
-            }
-        });
+        forEachTopic(Topic::checkMessageExpiry);
     }
 
     public void checkMessageDeduplicationInfo() {
+        forEachTopic(Topic::checkMessageDeduplicationInfo);
+    }
+
+    public void checkInactiveSubscriptions() {
+        forEachTopic(Topic::checkInactiveSubscriptions);
+    }
+
+    /**
+     * Iterates over all loaded topics in the broker
+     */
+    public void forEachTopic(Consumer<Topic> consumer) {
         topics.forEach((n, t) -> {
             Optional<Topic> topic = extractTopic(t);
             if (topic.isPresent()) {
-                topic.get().checkMessageDeduplicationInfo();
+                consumer.accept(topic.get());
             }
         });
     }
@@ -866,28 +875,18 @@ public boolean isBacklogExceeded(PersistentTopic topic) {
     }
 
     public void monitorBacklogQuota() {
-        topics.forEach((n, t) -> {
-            try {
-                Optional<Topic> optionalTopic = extractTopic(t);
-                if (optionalTopic.isPresent() && optionalTopic.get() instanceof PersistentTopic) {
-                    PersistentTopic topic = (PersistentTopic) optionalTopic.get();
-                    if (isBacklogExceeded(topic)) {
-                        getBacklogQuotaManager().handleExceededBacklogQuota(topic);
-                    } else if (topic == null) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("topic is null ");
-                        }
-                    } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("quota not exceeded for [{}]", topic.getName());
-                        }
+        forEachTopic(topic -> {
+            if (topic instanceof PersistentTopic) {
+                PersistentTopic persistentTopic = (PersistentTopic) topic;
+                if (isBacklogExceeded(persistentTopic)) {
+                    getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("quota not exceeded for [{}]", topic.getName());
                     }
                 }
-            } catch (Exception xle) {
-                log.warn("Backlog quota monitoring encountered :" + xle.getLocalizedMessage());
             }
         });
-
     }
 
     void checkTopicNsOwnership(final String topic) throws RuntimeException {
@@ -1035,12 +1034,9 @@ public String generateUniqueProducerName() {
 
     public Map<String, TopicStats> getTopicStats() {
         HashMap<String, TopicStats> stats = new HashMap<>();
-        topics.forEach((name, topicFuture) -> {
-            Optional<Topic> topic = extractTopic(topicFuture);
-            if (topic.isPresent()) {
-                stats.put(name, topic.get().getStats());
-            }
-        });
+
+        forEachTopic(topic -> stats.put(topic.getName(), topic.getStats()));
+
         return stats;
     }
 
@@ -1130,11 +1126,9 @@ private void updateConfigurationAndRegisterListeners() {
     private void updateTopicMessageDispatchRate() {
         this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
-            topics.forEach((name, topicFuture) -> {
-                Optional<Topic> topic = extractTopic(topicFuture);
-
-                if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
-                    PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+            forEachTopic(topic -> {
+                if (topic instanceof PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
                     // it first checks namespace-policy rate and if not present then applies broker-config
                     persistentTopic.getDispatchRateLimiter().updateDispatchRate();
                 }
@@ -1145,21 +1139,17 @@ private void updateTopicMessageDispatchRate() {
     private void updateSubscriptionMessageDispatchRate() {
         this.pulsar().getExecutor().submit(() -> {
             // update message-rate for each topic subscription
-            topics.forEach((name, topicFuture) -> {
-                Optional<Topic> topic = extractTopic(topicFuture);
-
-                if (topic.isPresent()) {
-                    topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
-                        if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
-                            ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher())
-                                    .getDispatchRateLimiter().updateDispatchRate();
-                        } else if (persistentSubscription
-                                .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
-                            ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher())
-                                    .getDispatchRateLimiter().updateDispatchRate();
-                        }
-                    });
-                }
+            forEachTopic(topic -> {
+                topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
+                    if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
+                        ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher())
+                                .getDispatchRateLimiter().updateDispatchRate();
+                    } else if (persistentSubscription
+                            .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
+                        ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher())
+                                .getDispatchRateLimiter().updateDispatchRate();
+                    }
+                });
             });
         });
     }
@@ -1167,22 +1157,17 @@ private void updateSubscriptionMessageDispatchRate() {
     private void updateManagedLedgerConfig() {
         this.pulsar().getExecutor().execute(() -> {
             // update managed-ledger config of each topic
-            topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    String topicName = null;
-                    try {
-                        Optional<Topic> topic = extractTopic(topicFuture);
-
-                        if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
-                            PersistentTopic persistentTopic = (PersistentTopic) topic.get();
-                            topicName = persistentTopic.getName();
-                            // update skipNonRecoverableLedger configuration
-                            persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
-                                    pulsar.getConfiguration().isAutoSkipNonRecoverableData());
-                        }
-                    } catch (Exception e) {
-                        log.warn("[{}] failed to update managed-ledger config", topicName, e);
+
+            forEachTopic(topic -> {
+                try {
+                    if (topic instanceof PersistentTopic) {
+                        PersistentTopic persistentTopic = (PersistentTopic) topic;
+                        // update skipNonRecoverableLedger configuration
+                        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+                                pulsar.getConfiguration().isAutoSkipNonRecoverableData());
                     }
+                } catch (Exception e) {
+                    log.warn("[{}] failed to update managed-ledger config", topic.getName(), e);
                 }
             });
         });
@@ -1437,23 +1422,20 @@ public boolean isBrokerDispatchingBlocked() {
     private void blockDispatchersWithLargeUnAckMessages() {
         lock.readLock().lock();
         try {
-            topics.forEach((name, topicFuture) -> {
-                Optional<Topic> topic = extractTopic(topicFuture);
-                if (topic.isPresent()) {
-                    topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
-                        if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
-                            PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
-                                    .getDispatcher();
-                            int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
-                            if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
-                                log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
-                                        dispatcher.getName(), dispatcher.getTotalUnackedMessages());
-                                dispatcher.blockDispatcherOnUnackedMsgs();
-                                blockedDispatchers.add(dispatcher);
-                            }
+            forEachTopic(topic -> {
+                topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
+                    if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
+                        PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
+                                .getDispatcher();
+                        int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
+                        if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
+                            log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
+                                    dispatcher.getName(), dispatcher.getTotalUnackedMessages());
+                            dispatcher.blockDispatcherOnUnackedMsgs();
+                            blockedDispatchers.add(dispatcher);
                         }
-                    });
-                }
+                    }
+                });
             });
         } finally {
             lock.readLock().unlock();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fbf3c65de3..fccc75bb60 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -101,6 +101,8 @@ default long getOriginalSequenceId() {
 
     void checkGC(int gcInterval);
 
+    void checkInactiveSubscriptions();
+
     void checkMessageExpiry();
 
     void checkMessageDeduplicationInfo();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e45016c309..cddfea362a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -912,6 +912,11 @@ public void checkGC(int gcIntervalInSeconds) {
         }
     }
 
+    @Override
+    public void checkInactiveSubscriptions() {
+        // no-op
+    }
+
     @Override
     public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
         if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 65f5f97634..bdace5513a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -96,6 +96,7 @@ public Topic getTopic() {
 
     @Override
     public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
+        cursor.updateLastActive();
         if (IS_FENCED_UPDATER.get(this) == TRUE) {
             log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
             throw new SubscriptionFencedException("Subscription is fenced");
@@ -144,6 +145,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
 
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+        cursor.updateLastActive();
         if (dispatcher != null) {
             dispatcher.removeConsumer(consumer);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c59bd9839..d56bf55b52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1462,6 +1462,19 @@ public void checkGC(int gcIntervalInSeconds) {
         }
     }
 
+    @Override
+    public void checkInactiveSubscriptions() {
+        final long expirationTime = TimeUnit.MINUTES.toMillis(brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
+        if (expirationTime <= 0) return;
+        subscriptions.forEach((subName, sub) -> {
+            if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) return;
+            if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTime) {
+                sub.delete().thenAccept(
+                        v -> log.info("[{}][{}] The subscription was deleted due to expiration", topic, subName));
+            }
+        });
+    }
+
     /**
      * Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's
      * marked as inactive.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
index 914a25c7e4..697f16a3b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
@@ -18,18 +18,15 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import com.google.common.collect.Maps;
+
 import java.util.Map;
-import java.util.Optional;
 
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.TopicName;
 
-import com.google.common.collect.Maps;
-
 /**
  */
 public class BookieClientStatsGenerator {
@@ -49,10 +46,10 @@ public BookieClientStatsGenerator(PulsarService pulsar) {
 
     private Map<String, Map<String, PendingBookieOpsStats>> generate() throws Exception {
         if (pulsar.getBrokerService() != null && pulsar.getBrokerService().getTopics() != null) {
-            pulsar.getBrokerService().getTopics().forEach((name, topicFuture) -> {
-                Optional<Topic> topic = BrokerService.extractTopic(topicFuture);
-                if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
-                    PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+
+            pulsar.getBrokerService().forEachTopic(topic -> {
+                if (topic instanceof PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
                     TopicName topicName = TopicName.get(persistentTopic.getName());
                     put(topicName, persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats());
                 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services