You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 11:50:16 UTC

[pulsar] branch branch-2.6 updated: Expose non-contiguous deleted messages ranges stats. (#8936)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new a90d366  Expose non-contiguous deleted messages ranges stats. (#8936)
a90d366 is described below

commit a90d3660c12f7920a1e9b65ca26d4c045a7c0521
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Dec 14 12:39:23 2020 +0800

    Expose non-contiguous deleted messages ranges stats. (#8936)
    
    Expose non-contiguous deleted messages ranges stats.
    
    New test added.
    
    (cherry picked from commit 1654e8872d389ec0a9e9dd22203267fa6333dead)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  5 +++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 13 ++++++-
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 +++
 .../service/persistent/PersistentSubscription.java |  3 ++
 .../broker/service/persistent/PersistentTopic.java |  3 ++
 .../pulsar/broker/stats/SubscriptionStatsTest.java | 43 ++++++++++++++++++++++
 .../common/policies/data/SubscriptionStats.java    | 11 ++++++
 .../pulsar/common/policies/data/TopicStats.java    | 10 +++++
 8 files changed, 92 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 345b215..2a5f03e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -607,6 +607,11 @@ public interface ManagedCursor {
     int getTotalNonContiguousDeletedMessagesRange();
 
     /**
+     * Returns the serialized size of mark-Delete ranges.
+     */
+    int getNonContiguousDeletedMessagesRangeSerializedSize();
+
+    /**
      * Returns the estimated size of the unacknowledged backlog for this cursor
      *
      * @return the estimated size from the mark delete position of the cursor
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b3fefc9..df66e91 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -182,6 +183,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     private long entriesReadCount;
     private long entriesReadSize;
+    private int individualDeletedMessagesSerializedSize;
 
     class MarkDeleteEntry {
         final PositionImpl newPosition;
@@ -824,6 +826,11 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
+    public int getNonContiguousDeletedMessagesRangeSerializedSize() {
+        return this.individualDeletedMessagesSerializedSize;
+    }
+
+    @Override
     public long getEstimatedSizeSinceMarkDeletePosition() {
         return ledger.estimateBacklogFromPosition(markDeletePosition);
     }
@@ -2417,6 +2424,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
                     .newBuilder();
             MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange.newBuilder();
+            AtomicInteger acksSerializedSize = new AtomicInteger(0);
             List<MessageRange> rangeList = Lists.newArrayList();
             individualDeletedMessages.forEach((positionRange) -> {
                 PositionImpl p = positionRange.lowerEndpoint();
@@ -2427,9 +2435,12 @@ public class ManagedCursorImpl implements ManagedCursor {
                 nestedPositionBuilder.setLedgerId(p.getLedgerId());
                 nestedPositionBuilder.setEntryId(p.getEntryId());
                 messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
-                rangeList.add(messageRangeBuilder.build());
+                MessageRange messageRange = messageRangeBuilder.build();
+                acksSerializedSize.addAndGet(messageRange.getSerializedSize());
+                rangeList.add(messageRange);
                 return rangeList.size() <= config.getMaxUnackedRangesToPersist();
             });
+            this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
             return rangeList;
         } finally {
             lock.readLock().unlock();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 32e55de..5733671 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -298,6 +298,11 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public int getNonContiguousDeletedMessagesRangeSerializedSize() {
+            return 0;
+        }
+
+        @Override
         public long getEstimatedSizeSinceMarkDeletePosition() {
             return 0L;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index aa9f263..51b1e99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1030,6 +1030,9 @@ public class PersistentSubscription implements Subscription {
                 });
             }
         }
+        subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
+        subStats.nonContiguousDeletedMessagesRangesSerializedSize =
+                cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
         return subStats;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 307e41d..7d832d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1546,6 +1546,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             stats.bytesOutCounter += subStats.bytesOutCounter;
             stats.msgOutCounter += subStats.msgOutCounter;
             stats.subscriptions.put(name, subStats);
+            stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
+            stats.nonContiguousDeletedMessagesRangesSerializedSize +=
+                    subStats.nonContiguousDeletedMessagesRangesSerializedSize;
         });
 
         replicators.forEach((cluster, replicator) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index e441e19..25b0861 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -22,17 +22,20 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 @Slf4j
 public class SubscriptionStatsTest extends ProducerConsumerBase {
@@ -91,4 +94,44 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
         consumer2.close();
         producer.close();
     }
+
+    @Test
+    public void testNonContiguousDeletedMessagesRanges() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testNonContiguousDeletedMessagesRanges-"
+                + UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            if (i != 50) {
+                consumer.acknowledge(received);
+            }
+        }
+
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            TopicStats stats = admin.topics().getStats(topicName);
+            Assert.assertEquals(stats.nonContiguousDeletedMessagesRanges, 1);
+            Assert.assertEquals(stats.subscriptions.size(), 1);
+            Assert.assertEquals(stats.subscriptions.get(subName).nonContiguousDeletedMessagesRanges, 1);
+            Assert.assertTrue(stats.nonContiguousDeletedMessagesRangesSerializedSize > 0);
+            Assert.assertTrue(stats.subscriptions.get(subName)
+                    .nonContiguousDeletedMessagesRangesSerializedSize > 0);
+        });
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index e5e6feb..c3a09ab 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -98,6 +98,12 @@ public class SubscriptionStats {
     /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */
     public Map<String, String> consumersAfterMarkDeletePosition;
 
+    /** The number of non-contiguous deleted messages ranges. */
+    public int nonContiguousDeletedMessagesRanges;
+
+    /** The serialized size of non-contiguous deleted messages ranges. */
+    public int nonContiguousDeletedMessagesRangesSerializedSize;
+
     public SubscriptionStats() {
         this.consumers = Lists.newArrayList();
         this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
@@ -115,6 +121,9 @@ public class SubscriptionStats {
         msgRateExpired = 0;
         lastExpireTimestamp = 0L;
         consumers.clear();
+        consumersAfterMarkDeletePosition.clear();
+        nonContiguousDeletedMessagesRanges = 0;
+        nonContiguousDeletedMessagesRangesSerializedSize = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -143,6 +152,8 @@ public class SubscriptionStats {
             }
         }
         this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
+        this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
+        this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
         return this;
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 8d560a9..bc6716b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -78,6 +78,12 @@ public class TopicStats {
 
     public String deduplicationStatus;
 
+    /** The number of non-contiguous deleted messages ranges. */
+    public int nonContiguousDeletedMessagesRanges;
+
+    /** The serialized size of non-contiguous deleted messages ranges. */
+    public int nonContiguousDeletedMessagesRangesSerializedSize;
+
     public TopicStats() {
         this.publishers = Lists.newArrayList();
         this.subscriptions = Maps.newHashMap();
@@ -101,6 +107,8 @@ public class TopicStats {
         this.subscriptions.clear();
         this.replication.clear();
         this.deduplicationStatus = null;
+        this.nonContiguousDeletedMessagesRanges = 0;
+        this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -120,6 +128,8 @@ public class TopicStats {
         this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
         this.backlogSize += stats.backlogSize;
+        this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
+        this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
         if (this.publishers.size() != stats.publishers.size()) {
             for (int i = 0; i < stats.publishers.size(); i++) {
                 PublisherStats publisherStats = new PublisherStats();