You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:37:14 UTC

[pulsar] 02/02: Monitor if a cursor moves its mark-delete position (#8930)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4ed43c9121b1a92235effc8c534234dc99a5e67d
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Jan 5 19:22:59 2021 -0800

    Monitor if a cursor moves its mark-delete position (#8930)
    
    Motivation
    
    msgBacklog or storageSize doesn't provide a clear idea
    if mark-delete position is advanced or not. Add a new metric
    in SubscriptionStat to monitor if its mark-delete position
    is advanced or not.
    
    (cherry picked from commit 2fd878a8f4bfdd5cffa7fa6450714ec1ad25f514)
---
 .../pulsar/broker/namespace/NamespaceService.java  |  2 +-
 .../persistent/PersistentMessageExpiryMonitor.java |  5 ++++
 .../service/persistent/PersistentSubscription.java |  9 +++++++
 .../prometheus/AggregatedSubscriptionStats.java    |  8 ++++++
 .../stats/prometheus/NamespaceStatsAggregator.java |  6 ++++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  8 ++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 30 +++++++++++++++++-----
 .../common/policies/data/SubscriptionStats.java    |  4 +++
 8 files changed, 63 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 99b661e..580ca0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -241,7 +241,7 @@ public class NamespaceService {
     /**
      * Return the URL of the broker who's owning a particular service unit in asynchronous way
      *
-     * If the service unit is not owned, return a CompletableFuture with empty optional
+     * If the service unit is not owned, return a CompletableFuture with empty optional.
      */
     public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) throws Exception {
         if (suName instanceof TopicName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 315a0a3..502dd2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
@@ -132,7 +133,11 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
     public void findEntryComplete(Position position, Object ctx) {
         if (position != null) {
             log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position);
+            Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
             cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false));
+            if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) {
+                subscription.updateLastMarkDeleteAdvancedTimestamp();
+            }
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] No messages to expire", topicName, subName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index e75763f..2c87f02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -93,6 +93,7 @@ public class PersistentSubscription implements Subscription {
 
     private long lastExpireTimestamp = 0L;
     private long lastConsumedFlowTimestamp = 0L;
+    private long lastMarkDeleteAdvancedTimestamp = 0L;
 
     // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
@@ -139,6 +140,11 @@ public class PersistentSubscription implements Subscription {
         IS_FENCED_UPDATER.set(this, FALSE);
     }
 
+    public void updateLastMarkDeleteAdvancedTimestamp() {
+        this.lastMarkDeleteAdvancedTimestamp =
+            Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis());
+    }
+
     @Override
     public BrokerInterceptor interceptor() {
         return topic.getBrokerService().getInterceptor();
@@ -334,6 +340,8 @@ public class PersistentSubscription implements Subscription {
         }
 
         if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
+            this.updateLastMarkDeleteAdvancedTimestamp();
+
             // Mark delete position advance
             ReplicatedSubscriptionSnapshotCache snapshotCache  = this.replicatedSubscriptionSnapshotCache;
             if (snapshotCache != null) {
@@ -896,6 +904,7 @@ public class PersistentSubscription implements Subscription {
         SubscriptionStats subStats = new SubscriptionStats();
         subStats.lastExpireTimestamp = lastExpireTimestamp;
         subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
+        subStats.lastMarkDeleteAdvancedTimestamp = lastMarkDeleteAdvancedTimestamp;
         Dispatcher dispatcher = this.dispatcher;
         if (dispatcher != null) {
             Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared?
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index f3573f7..ce07180 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -47,6 +47,14 @@ public class AggregatedSubscriptionStats {
 
     long lastExpireTimestamp;
 
+    long lastConsumedFlowTimestamp;
+
+    long lastConsumedTimestamp;
+
+    long lastAckedTimestamp;
+
+    long lastMarkDeleteAdvancedTimestamp;
+
     double msgRateExpired;
 
     long totalMsgExpired;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3e52c44..3364214 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -138,10 +138,14 @@ public class NamespaceStatsAggregator {
                     .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
             subsStats.msgBacklog = subscriptionStats.msgBacklog;
             subsStats.msgDelayed = subscriptionStats.msgDelayed;
-            subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
             subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
             subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
             subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+            subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
+            subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
+            subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
+            subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
+            subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
             subscriptionStats.consumers.forEach(cStats -> {
                 stats.consumersCount++;
                 subsStats.unackedMessages += cStats.unackedMessages;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index d27e863..1d39d58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -177,6 +177,14 @@ class TopicStats {
                     subsStats.msgOutCounter);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
                     subsStats.lastExpireTimestamp);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
+                subsStats.lastAckedTimestamp);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
+                subsStats.lastConsumedFlowTimestamp);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
+                subsStats.lastConsumedTimestamp);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
+                subsStats.lastMarkDeleteAdvancedTimestamp);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
                     subsStats.msgRateExpired);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ad9939c..67d2117 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2073,17 +2073,29 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
 
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
-        assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0);
-        assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
-        assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
+        SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(subStats1.msgBacklog, 0);
+        assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
+        SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(subStats2.msgBacklog, 10);
+        assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
+        SubscriptionStats subStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(subStats3.msgBacklog, 10);
+        assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
         admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
         Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async
 
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
-        assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 0);
-        assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 0);
-        assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 0);
+        SubscriptionStats newSubStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(newSubStats1.msgBacklog, 0);
+        assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, subStats1.lastMarkDeleteAdvancedTimestamp);
+        SubscriptionStats newSubStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(newSubStats2.msgBacklog, 0);
+        assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > subStats2.lastMarkDeleteAdvancedTimestamp);
+        SubscriptionStats newSubStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(newSubStats3.msgBacklog, 0);
+        assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > subStats3.lastMarkDeleteAdvancedTimestamp);
 
         consumer1.close();
         consumer2.close();
@@ -2523,6 +2535,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             producer.send(new byte[1024 * i * 5]);
         }
 
+        TopicStats topicStats = admin.topics().getStats(topic);
+        assertEquals(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp, 0L);
+
         for (int i = 0; i < messages; i++) {
             consumer.acknowledgeCumulative(consumer.receive());
         }
@@ -2530,8 +2545,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         // Wait ack send
         Thread.sleep(1000);
 
-        TopicStats topicStats = admin.topics().getStats(topic);
+        topicStats = admin.topics().getStats(topic);
         assertEquals(topicStats.backlogSize, 0);
+        assertTrue(topicStats.subscriptions.get("sub-1").lastMarkDeleteAdvancedTimestamp > 0L);
     }
 
     @Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 12c5767..cd3d6d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -89,6 +89,9 @@ public class SubscriptionStats {
     /** Last acked message timestamp. */
     public long lastAckedTimestamp;
 
+    /** Last MarkDelete position advanced timesetamp. */
+    public long lastMarkDeleteAdvancedTimestamp;
+
     /** List of connected consumers on this subscription w/ their stats. */
     public List<ConsumerStats> consumers;
 
@@ -124,6 +127,7 @@ public class SubscriptionStats {
         msgRateExpired = 0;
         totalMsgExpired = 0;
         lastExpireTimestamp = 0L;
+        lastMarkDeleteAdvancedTimestamp = 0L;
         consumers.clear();
         consumersAfterMarkDeletePosition.clear();
         nonContiguousDeletedMessagesRanges = 0;