You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 16:36:32 UTC
[incubator-pulsar] branch master updated: Delete inactive
subscriptions automatically (#1352)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 762036c Delete inactive subscriptions automatically (#1352)
762036c is described below
commit 762036c36f071d0aca148bf1d333b3df0da66191
Author: yush1ga <y....@gmail.com>
AuthorDate: Wed May 2 01:36:30 2018 +0900
Delete inactive subscriptions automatically (#1352)
* Delete inactive subscriptions automatically
* Addressed PR comments
* Add subscriptionExpiryCheckIntervalInMinutes
* Add lastActive for ManagedCursorInfo
---
conf/broker.conf | 7 +
conf/standalone.conf | 7 +
.../apache/bookkeeper/mledger/ManagedCursor.java | 13 ++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 26 +++-
.../bookkeeper/mledger/proto/MLDataFormats.java | 65 +++++++-
managed-ledger/src/main/proto/MLDataFormats.proto | 2 +
.../mledger/impl/ManagedCursorContainerTest.java | 10 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 21 +++
.../broker/admin/v1/NonPersistentTopics.java | 10 +-
.../broker/admin/v2/NonPersistentTopics.java | 10 +-
.../pulsar/broker/service/BrokerService.java | 166 +++++++++------------
.../org/apache/pulsar/broker/service/Topic.java | 2 +
.../service/nonpersistent/NonPersistentTopic.java | 5 +
.../service/persistent/PersistentSubscription.java | 2 +
.../broker/service/persistent/PersistentTopic.java | 13 ++
.../broker/stats/BookieClientStatsGenerator.java | 15 +-
16 files changed, 252 insertions(+), 122 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index dc7ca04..893a3e2 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -76,6 +76,13 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000
+# How long to delete inactive subscriptions from last consuming
+# When it is 0, inactive subscriptions are not deleted automatically
+subscriptionExpirationTimeMinutes=0
+
+# How frequently to proactively check and purge expired subscription
+subscriptionExpiryCheckIntervalInMinutes=5
+
# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f5c9546..bc6dc10 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -69,6 +69,13 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000
+# How long to delete inactive subscriptions from last consuming
+# When it is 0, inactive subscriptions are not deleted automatically
+subscriptionExpirationTimeMinutes=0
+
+# How frequently to proactively check and purge expired subscription
+subscriptionExpiryCheckIntervalInMinutes=5
+
# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index f6793b4..186a450 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -58,6 +58,19 @@ public interface ManagedCursor {
String getName();
/**
+ * Get the last active time of the cursor.
+ *
+ * @return the last active time of the cursor
+ */
+ long getLastActive();
+
+ /**
+ * Update the last active time of the cursor
+ *
+ */
+ void updateLastActive();
+
+ /**
* Return any properties that were associated with the last stored position.
*/
Map<String, Long> getProperties();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 695f22d..194e8c0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -158,6 +158,9 @@ public class ManagedCursorImpl implements ManagedCursor {
private long lastLedgerSwitchTimestamp;
private final Clock clock;
+ // The last active time (Unix time, milliseconds) of the cursor
+ private long lastActive;
+
enum State {
Uninitialized, // Cursor is being initialized
NoLedger, // There is no metadata ledger open for writing
@@ -189,6 +192,7 @@ public class ManagedCursorImpl implements ManagedCursor {
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
WAITING_READ_OP_UPDATER.set(this, null);
this.clock = config.getClock();
+ this.lastActive = this.clock.millis();
this.lastLedgerSwitchTimestamp = this.clock.millis();
if (config.getThrottleMarkDelete() > 0.0) {
@@ -216,6 +220,7 @@ public class ManagedCursorImpl implements ManagedCursor {
public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
+ lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;
if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
@@ -1280,7 +1285,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
-
+
if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
@@ -1307,7 +1312,7 @@ public class ManagedCursorImpl implements ManagedCursor {
final MarkDeleteCallback callback, final Object ctx) {
checkNotNull(position);
checkArgument(position instanceof PositionImpl);
-
+
if (STATE_UPDATER.get(this) == State.Closed) {
callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
@@ -1328,7 +1333,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
}
PositionImpl newPosition = (PositionImpl) position;
-
+
if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug(
@@ -1541,7 +1546,7 @@ public class ManagedCursorImpl implements ManagedCursor {
for (Position pos : positions) {
PositionImpl position = (PositionImpl) checkNotNull(pos);
-
+
if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
if (log.isDebugEnabled()) {
log.debug(
@@ -1693,6 +1698,16 @@ public class ManagedCursorImpl implements ManagedCursor {
}
@Override
+ public long getLastActive() {
+ return lastActive;
+ }
+
+ @Override
+ public void updateLastActive() {
+ lastActive = System.currentTimeMillis();
+ }
+
+ @Override
public boolean isDurable() {
return true;
}
@@ -1837,7 +1852,8 @@ public class ManagedCursorImpl implements ManagedCursor {
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
.setCursorsLedgerId(cursorsLedgerId) //
.setMarkDeleteLedgerId(position.getLedgerId()) //
- .setMarkDeleteEntryId(position.getEntryId()); //
+ .setMarkDeleteEntryId(position.getEntryId()) //
+ .setLastActive(lastActive); //
info.addAllProperties(buildPropertiesMap(properties));
if (persistIndividualDeletedMessageRanges) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
index 73ba1da..fc607b9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java
@@ -4428,6 +4428,10 @@ public final class MLDataFormats {
getPropertiesOrBuilderList();
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongPropertyOrBuilder getPropertiesOrBuilder(
int index);
+
+ // optional int64 lastActive = 6;
+ boolean hasLastActive();
+ long getLastActive();
}
public static final class ManagedCursorInfo extends
com.google.protobuf.GeneratedMessage
@@ -4530,12 +4534,23 @@ public final class MLDataFormats {
return properties_.get(index);
}
+ // optional int64 lastActive = 6;
+ public static final int LASTACTIVE_FIELD_NUMBER = 6;
+ private long lastActive_;
+ public boolean hasLastActive() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public long getLastActive() {
+ return lastActive_;
+ }
+
private void initFields() {
cursorsLedgerId_ = 0L;
markDeleteLedgerId_ = 0L;
markDeleteEntryId_ = 0L;
individualDeletedMessages_ = java.util.Collections.emptyList();
properties_ = java.util.Collections.emptyList();
+ lastActive_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4580,6 +4595,9 @@ public final class MLDataFormats {
for (int i = 0; i < properties_.size(); i++) {
output.writeMessage(5, properties_.get(i));
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeInt64(6, lastActive_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4609,6 +4627,10 @@ public final class MLDataFormats {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, properties_.get(i));
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(6, lastActive_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4753,6 +4775,8 @@ public final class MLDataFormats {
} else {
propertiesBuilder_.clear();
}
+ lastActive_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -4821,6 +4845,10 @@ public final class MLDataFormats {
} else {
result.properties_ = propertiesBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.lastActive_ = lastActive_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4898,6 +4926,9 @@ public final class MLDataFormats {
}
}
}
+ if (other.hasLastActive()) {
+ setLastActive(other.getLastActive());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -4972,6 +5003,11 @@ public final class MLDataFormats {
addProperties(subBuilder.buildPartial());
break;
}
+ case 48: {
+ bitField0_ |= 0x00000020;
+ lastActive_ = input.readInt64();
+ break;
+ }
}
}
}
@@ -5413,6 +5449,27 @@ public final class MLDataFormats {
return propertiesBuilder_;
}
+ // optional int64 lastActive = 6;
+ private long lastActive_ ;
+ public boolean hasLastActive() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getLastActive() {
+ return lastActive_;
+ }
+ public Builder setLastActive(long value) {
+ bitField0_ |= 0x00000020;
+ lastActive_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearLastActive() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ lastActive_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:ManagedCursorInfo)
}
@@ -5491,13 +5548,13 @@ public final class MLDataFormats {
"Range\022*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPos" +
"itionInfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.Neste" +
"dPositionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 " +
- "\002(\t\022\r\n\005value\030\002 \002(\003\"\270\001\n\021ManagedCursorInfo" +
+ "\002(\t\022\r\n\005value\030\002 \002(\003\"\314\001\n\021ManagedCursorInfo" +
"\022\027\n\017cursorsLedgerId\030\001 \002(\003\022\032\n\022markDeleteL",
"edgerId\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 \001(\003" +
"\0220\n\031individualDeletedMessages\030\004 \003(\0132\r.Me" +
"ssageRange\022!\n\nproperties\030\005 \003(\0132\r.LongPro" +
- "pertyB\'\n#org.apache.bookkeeper.mledger.p" +
- "rotoH\001"
+ "perty\022\022\n\nlastActive\030\006 \001(\003B\'\n#org.apache." +
+ "bookkeeper.mledger.protoH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5565,7 +5622,7 @@ public final class MLDataFormats {
internal_static_ManagedCursorInfo_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ManagedCursorInfo_descriptor,
- new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", },
+ new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", "LastActive", },
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.class,
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.Builder.class);
return null;
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 29f465c..0d5ad3a 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -86,4 +86,6 @@ message ManagedCursorInfo {
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 5;
+
+ optional int64 lastActive = 6;
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index de98b60..c9021ae 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -122,6 +122,16 @@ public class ManagedCursorContainerTest {
return name;
}
+ @Override
+ public long getLastActive() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public void updateLastActive() {
+ // no-op
+ }
+
public String toString() {
return String.format("%s=%s", name, position);
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5cf31c1..9a69fc3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -93,6 +93,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int messageExpiryCheckIntervalInMinutes = 5;
// How long to delay rewinding cursor and dispatching messages when active consumer is changed
private int activeConsumerFailoverDelayTimeMillis = 1000;
+ // How long to delete inactive subscriptions from last consuming
+ // When it is 0, inactive subscriptions are not deleted automatically
+ private long subscriptionExpirationTimeMinutes = 0;
+ // How frequently to proactively check and purge expired subscription
+ private long subscriptionExpiryCheckIntervalInMinutes = 5;
// Set the default behavior for message deduplication in the broker
// This can be overridden per-namespace. If enabled, broker will reject
@@ -681,6 +686,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.activeConsumerFailoverDelayTimeMillis = activeConsumerFailoverDelayTimeMillis;
}
+ public long getSubscriptionExpirationTimeMinutes() {
+ return subscriptionExpirationTimeMinutes;
+ }
+
+ public void setSubscriptionExpirationTimeMinutes(long subscriptionExpirationTimeMinutes) {
+ this.subscriptionExpirationTimeMinutes = subscriptionExpirationTimeMinutes;
+ }
+
+ public long getSubscriptionExpiryCheckIntervalInMinutes() {
+ return subscriptionExpiryCheckIntervalInMinutes;
+ }
+
+ public void setSubscriptionExpiryCheckIntervalInMinutes(long subscriptionExpiryCheckIntervalInMinutes) {
+ this.subscriptionExpiryCheckIntervalInMinutes = subscriptionExpiryCheckIntervalInMinutes;
+ }
+
public boolean isClientLibraryVersionCheckEnabled() {
return clientLibraryVersionCheckEnabled;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index e9fbe0a..f73a829 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -236,12 +236,10 @@ public class NonPersistentTopics extends PersistentTopics {
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
try {
final List<String> topicList = Lists.newArrayList();
- pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
- if (BrokerService.extractTopic(topicFuture).isPresent()) {
- TopicName topicName = TopicName.get(name);
- if (nsBundle.includes(topicName)) {
- topicList.add(name);
- }
+ pulsar().getBrokerService().forEachTopic(topic -> {
+ TopicName topicName = TopicName.get(topic.getName());
+ if (nsBundle.includes(topicName)) {
+ topicList.add(topic.getName());
}
});
return topicList;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 8f616d2..9d49ad1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -227,12 +227,10 @@ public class NonPersistentTopics extends PersistentTopics {
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, true, true);
try {
final List<String> topicList = Lists.newArrayList();
- pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
- if (BrokerService.extractTopic(topicFuture).isPresent()) {
- TopicName topicName = TopicName.get(name);
- if (nsBundle.includes(topicName)) {
- topicList.add(name);
- }
+ pulsar().getBrokerService().forEachTopic(topic -> {
+ TopicName topicName = TopicName.get(topic.getName());
+ if (nsBundle.includes(topicName)) {
+ topicList.add(topic.getName());
}
});
return topicList;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ad709db..43f5000 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -109,9 +109,9 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
@@ -324,10 +324,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
// Deduplication info checker
- long intervalInSeconds = TimeUnit.MINUTES
+ long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES
.toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3;
- inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), intervalInSeconds,
- intervalInSeconds, TimeUnit.SECONDS);
+ inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), duplicationCheckerIntervalInSeconds,
+ duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
+
+ // Inactive subscriber checker
+ if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes() > 0) {
+ long subscriptionExpiryCheckIntervalInSeconds =
+ TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
+ inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
+ subscriptionExpiryCheckIntervalInSeconds, subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS);
+ }
}
void startMessageExpiryMonitor() {
@@ -813,28 +821,29 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
public void checkGC(int gcIntervalInSeconds) {
- topics.forEach((n, t) -> {
- Optional<Topic> topic = extractTopic(t);
- if (topic.isPresent()) {
- topic.get().checkGC(gcIntervalInSeconds);
- }
- });
+ forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds));
}
public void checkMessageExpiry() {
- topics.forEach((n, t) -> {
- Optional<Topic> topic = extractTopic(t);
- if (topic.isPresent()) {
- topic.get().checkMessageExpiry();
- }
- });
+ forEachTopic(Topic::checkMessageExpiry);
}
public void checkMessageDeduplicationInfo() {
+ forEachTopic(Topic::checkMessageDeduplicationInfo);
+ }
+
+ public void checkInactiveSubscriptions() {
+ forEachTopic(Topic::checkInactiveSubscriptions);
+ }
+
+ /**
+ * Iterates over all loaded topics in the broker
+ */
+ public void forEachTopic(Consumer<Topic> consumer) {
topics.forEach((n, t) -> {
Optional<Topic> topic = extractTopic(t);
if (topic.isPresent()) {
- topic.get().checkMessageDeduplicationInfo();
+ consumer.accept(topic.get());
}
});
}
@@ -866,28 +875,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
public void monitorBacklogQuota() {
- topics.forEach((n, t) -> {
- try {
- Optional<Topic> optionalTopic = extractTopic(t);
- if (optionalTopic.isPresent() && optionalTopic.get() instanceof PersistentTopic) {
- PersistentTopic topic = (PersistentTopic) optionalTopic.get();
- if (isBacklogExceeded(topic)) {
- getBacklogQuotaManager().handleExceededBacklogQuota(topic);
- } else if (topic == null) {
- if (log.isDebugEnabled()) {
- log.debug("topic is null ");
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("quota not exceeded for [{}]", topic.getName());
- }
+ forEachTopic(topic -> {
+ if (topic instanceof PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+ if (isBacklogExceeded(persistentTopic)) {
+ getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("quota not exceeded for [{}]", topic.getName());
}
}
- } catch (Exception xle) {
- log.warn("Backlog quota monitoring encountered :" + xle.getLocalizedMessage());
}
});
-
}
void checkTopicNsOwnership(final String topic) throws RuntimeException {
@@ -1035,12 +1034,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
public Map<String, TopicStats> getTopicStats() {
HashMap<String, TopicStats> stats = new HashMap<>();
- topics.forEach((name, topicFuture) -> {
- Optional<Topic> topic = extractTopic(topicFuture);
- if (topic.isPresent()) {
- stats.put(name, topic.get().getStats());
- }
- });
+
+ forEachTopic(topic -> stats.put(topic.getName(), topic.getStats()));
+
return stats;
}
@@ -1130,11 +1126,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private void updateTopicMessageDispatchRate() {
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
- topics.forEach((name, topicFuture) -> {
- Optional<Topic> topic = extractTopic(topicFuture);
-
- if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
- PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+ forEachTopic(topic -> {
+ if (topic instanceof PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
// it first checks namespace-policy rate and if not present then applies broker-config
persistentTopic.getDispatchRateLimiter().updateDispatchRate();
}
@@ -1145,21 +1139,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private void updateSubscriptionMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic subscription
- topics.forEach((name, topicFuture) -> {
- Optional<Topic> topic = extractTopic(topicFuture);
-
- if (topic.isPresent()) {
- topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
- if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
- ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher())
- .getDispatchRateLimiter().updateDispatchRate();
- } else if (persistentSubscription
- .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
- ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher())
- .getDispatchRateLimiter().updateDispatchRate();
- }
- });
- }
+ forEachTopic(topic -> {
+ topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
+ if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
+ ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher())
+ .getDispatchRateLimiter().updateDispatchRate();
+ } else if (persistentSubscription
+ .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
+ ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher())
+ .getDispatchRateLimiter().updateDispatchRate();
+ }
+ });
});
});
}
@@ -1167,22 +1157,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private void updateManagedLedgerConfig() {
this.pulsar().getExecutor().execute(() -> {
// update managed-ledger config of each topic
- topics.forEach((name, topicFuture) -> {
- if (topicFuture.isDone()) {
- String topicName = null;
- try {
- Optional<Topic> topic = extractTopic(topicFuture);
-
- if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
- PersistentTopic persistentTopic = (PersistentTopic) topic.get();
- topicName = persistentTopic.getName();
- // update skipNonRecoverableLedger configuration
- persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
- pulsar.getConfiguration().isAutoSkipNonRecoverableData());
- }
- } catch (Exception e) {
- log.warn("[{}] failed to update managed-ledger config", topicName, e);
+
+ forEachTopic(topic -> {
+ try {
+ if (topic instanceof PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+ // update skipNonRecoverableLedger configuration
+ persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+ pulsar.getConfiguration().isAutoSkipNonRecoverableData());
}
+ } catch (Exception e) {
+ log.warn("[{}] failed to update managed-ledger config", topic.getName(), e);
}
});
});
@@ -1437,23 +1422,20 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private void blockDispatchersWithLargeUnAckMessages() {
lock.readLock().lock();
try {
- topics.forEach((name, topicFuture) -> {
- Optional<Topic> topic = extractTopic(topicFuture);
- if (topic.isPresent()) {
- topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
- if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
- PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
- .getDispatcher();
- int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
- if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
- log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
- dispatcher.getName(), dispatcher.getTotalUnackedMessages());
- dispatcher.blockDispatcherOnUnackedMsgs();
- blockedDispatchers.add(dispatcher);
- }
+ forEachTopic(topic -> {
+ topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
+ if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
+ PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
+ .getDispatcher();
+ int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
+ if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
+ log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
+ dispatcher.getName(), dispatcher.getTotalUnackedMessages());
+ dispatcher.blockDispatcherOnUnackedMsgs();
+ blockedDispatchers.add(dispatcher);
}
- });
- }
+ }
+ });
});
} finally {
lock.readLock().unlock();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fbf3c65..fccc75b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -101,6 +101,8 @@ public interface Topic {
void checkGC(int gcInterval);
+ void checkInactiveSubscriptions();
+
void checkMessageExpiry();
void checkMessageDeduplicationInfo();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e45016c..cddfea3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -913,6 +913,11 @@ public class NonPersistentTopic implements Topic {
}
@Override
+ public void checkInactiveSubscriptions() {
+ // no-op
+ }
+
+ @Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 65f5f97..bdace55 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -96,6 +96,7 @@ public class PersistentSubscription implements Subscription {
@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
+ cursor.updateLastActive();
if (IS_FENCED_UPDATER.get(this) == TRUE) {
log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
throw new SubscriptionFencedException("Subscription is fenced");
@@ -144,6 +145,7 @@ public class PersistentSubscription implements Subscription {
@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+ cursor.updateLastActive();
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c59bd9..d56bf55 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1462,6 +1462,19 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
}
+ @Override
+ public void checkInactiveSubscriptions() {
+ final long expirationTime = TimeUnit.MINUTES.toMillis(brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
+ if (expirationTime <= 0) return;
+ subscriptions.forEach((subName, sub) -> {
+ if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) return;
+ if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTime) {
+ sub.delete().thenAccept(
+ v -> log.info("[{}][{}] The subscription was deleted due to expiration", topic, subName));
+ }
+ });
+ }
+
/**
* Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's
* marked as inactive.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
index 914a25c..697f16a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
@@ -18,18 +18,15 @@
*/
package org.apache.pulsar.broker.stats;
+import com.google.common.collect.Maps;
+
import java.util.Map;
-import java.util.Optional;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
-import com.google.common.collect.Maps;
-
/**
*/
public class BookieClientStatsGenerator {
@@ -49,10 +46,10 @@ public class BookieClientStatsGenerator {
private Map<String, Map<String, PendingBookieOpsStats>> generate() throws Exception {
if (pulsar.getBrokerService() != null && pulsar.getBrokerService().getTopics() != null) {
- pulsar.getBrokerService().getTopics().forEach((name, topicFuture) -> {
- Optional<Topic> topic = BrokerService.extractTopic(topicFuture);
- if (topic.isPresent() && topic.get() instanceof PersistentTopic) {
- PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+
+ pulsar.getBrokerService().forEachTopic(topic -> {
+ if (topic instanceof PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
TopicName topicName = TopicName.get(persistentTopic.getName());
put(topicName, persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats());
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.