You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:37:14 UTC
[pulsar] 02/02: Monitor if a cursor moves its mark-delete position
(#8930)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4ed43c9121b1a92235effc8c534234dc99a5e67d
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Jan 5 19:22:59 2021 -0800
Monitor if a cursor moves its mark-delete position (#8930)
Motivation
msgBacklog or storageSize doesn't provide a clear idea
if mark-delete position is advanced or not. Add a new metric
in SubscriptionStat to monitor if its mark-delete position
is advanced or not.
(cherry picked from commit 2fd878a8f4bfdd5cffa7fa6450714ec1ad25f514)
---
.../pulsar/broker/namespace/NamespaceService.java | 2 +-
.../persistent/PersistentMessageExpiryMonitor.java | 5 ++++
.../service/persistent/PersistentSubscription.java | 9 +++++++
.../prometheus/AggregatedSubscriptionStats.java | 8 ++++++
.../stats/prometheus/NamespaceStatsAggregator.java | 6 ++++-
.../pulsar/broker/stats/prometheus/TopicStats.java | 8 ++++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 30 +++++++++++++++++-----
.../common/policies/data/SubscriptionStats.java | 4 +++
8 files changed, 63 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 99b661e..580ca0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -241,7 +241,7 @@ public class NamespaceService {
/**
* Return the URL of the broker who's owning a particular service unit in asynchronous way
*
- * If the service unit is not owned, return a CompletableFuture with empty optional
+ * If the service unit is not owned, return a CompletableFuture with empty optional.
*/
public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) throws Exception {
if (suName instanceof TopicName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 315a0a3..502dd2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
@@ -132,7 +133,11 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
public void findEntryComplete(Position position, Object ctx) {
if (position != null) {
log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position);
+ Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false));
+ if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) {
+ subscription.updateLastMarkDeleteAdvancedTimestamp();
+ }
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] No messages to expire", topicName, subName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index e75763f..2c87f02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -93,6 +93,7 @@ public class PersistentSubscription implements Subscription {
private long lastExpireTimestamp = 0L;
private long lastConsumedFlowTimestamp = 0L;
+ private long lastMarkDeleteAdvancedTimestamp = 0L;
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
@@ -139,6 +140,11 @@ public class PersistentSubscription implements Subscription {
IS_FENCED_UPDATER.set(this, FALSE);
}
+ public void updateLastMarkDeleteAdvancedTimestamp() {
+ this.lastMarkDeleteAdvancedTimestamp =
+ Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis());
+ }
+
@Override
public BrokerInterceptor interceptor() {
return topic.getBrokerService().getInterceptor();
@@ -334,6 +340,8 @@ public class PersistentSubscription implements Subscription {
}
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
+ this.updateLastMarkDeleteAdvancedTimestamp();
+
// Mark delete position advance
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
@@ -896,6 +904,7 @@ public class PersistentSubscription implements Subscription {
SubscriptionStats subStats = new SubscriptionStats();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
+ subStats.lastMarkDeleteAdvancedTimestamp = lastMarkDeleteAdvancedTimestamp;
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared?
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index f3573f7..ce07180 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -47,6 +47,14 @@ public class AggregatedSubscriptionStats {
long lastExpireTimestamp;
+ long lastConsumedFlowTimestamp;
+
+ long lastConsumedTimestamp;
+
+ long lastAckedTimestamp;
+
+ long lastMarkDeleteAdvancedTimestamp;
+
double msgRateExpired;
long totalMsgExpired;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3e52c44..3364214 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -138,10 +138,14 @@ public class NamespaceStatsAggregator {
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
- subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+ subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
+ subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
+ subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
+ subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
+ subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index d27e863..1d39d58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -177,6 +177,14 @@ class TopicStats {
subsStats.msgOutCounter);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
subsStats.lastExpireTimestamp);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
+ subsStats.lastAckedTimestamp);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
+ subsStats.lastConsumedFlowTimestamp);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
+ subsStats.lastConsumedTimestamp);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
+ subsStats.lastMarkDeleteAdvancedTimestamp);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
subsStats.msgRateExpired);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ad9939c..67d2117 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2073,17 +2073,29 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
- assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0);
- assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
- assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
+ SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
+ assertEquals(subStats1.msgBacklog, 0);
+ assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
+ SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
+ assertEquals(subStats2.msgBacklog, 10);
+ assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
+ SubscriptionStats subStats3 = topicStats.subscriptions.get("my-sub3");
+ assertEquals(subStats3.msgBacklog, 10);
+ assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
- assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0);
- assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 0);
- assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 0);
+ SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1");
+ assertEquals(newSubStats1.msgBacklog, 0);
+ assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
+ SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2");
+ assertEquals(newSubStats2.msgBacklog, 0);
+ assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp);
+ SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3");
+ assertEquals(newSubStats3.msgBacklog, 0);
+ assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp);
consumer1.close();
consumer2.close();
@@ -2523,6 +2535,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
producer.send(new byte[1024 * i * 5]);
}
+ TopicStats topicStats = admin.topics().getStats(topic);
+ assertEquals(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp, 0L);
+
for (int i = 0; i < messages; i++) {
consumer.acknowledgeCumulative(consumer.receive());
}
@@ -2530,8 +2545,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
// Wait ack send
Thread.sleep(1000);
- TopicStats topicStats = admin.topics().getStats(topic);
+ topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.backlogSize, 0);
+ assertTrue(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp > 0L);
}
@Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 12c5767..cd3d6d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -89,6 +89,9 @@ public class SubscriptionStats {
/** Last acked message timestamp. */
public long lastAckedTimestamp;
+ /** Last MarkDelete position advanced timesetamp. */
+ public long lastMarkDeleteAdvancedTimestamp;
+
/** List of connected consumers on this subscription w/ their stats. */
public List<ConsumerStats> consumers;
@@ -124,6 +127,7 @@ public class SubscriptionStats {
msgRateExpired = 0;
totalMsgExpired = 0;
lastExpireTimestamp = 0L;
+ lastMarkDeleteAdvancedTimestamp = 0L;
consumers.clear();
consumersAfterMarkDeletePosition.clear();
nonContiguousDeletedMessagesRanges = 0;