You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 13:04:08 UTC
[pulsar] 02/07: Expose non-contiguous deleted messages ranges
stats. (#8936)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c83da2c6cc90cf157506e6fa983b217c4826ebd9
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Dec 14 12:39:23 2020 +0800
Expose non-contiguous deleted messages ranges stats. (#8936)
Expose non-contiguous deleted messages ranges stats.
New test added.
(cherry picked from commit 1654e8872d389ec0a9e9dd22203267fa6333dead)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 5 +++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 13 ++++++-
.../mledger/impl/ManagedCursorContainerTest.java | 5 +++
.../service/persistent/PersistentSubscription.java | 3 ++
.../broker/service/persistent/PersistentTopic.java | 3 ++
.../pulsar/broker/stats/SubscriptionStatsTest.java | 43 ++++++++++++++++++++++
.../common/policies/data/SubscriptionStats.java | 11 ++++++
.../pulsar/common/policies/data/TopicStats.java | 10 +++++
8 files changed, 92 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 94642aa..7a7ac25 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -609,6 +609,11 @@ public interface ManagedCursor {
int getTotalNonContiguousDeletedMessagesRange();
/**
+ * Returns the serialized size of mark-Delete ranges.
+ */
+ int getNonContiguousDeletedMessagesRangeSerializedSize();
+
+ /**
* Returns the estimated size of the unacknowledged backlog for this cursor
*
* @return the estimated size from the mark delete position of the cursor
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1b7d22e..e9b96be 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -185,6 +186,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private long entriesReadCount;
private long entriesReadSize;
+ private int individualDeletedMessagesSerializedSize;
class MarkDeleteEntry {
final PositionImpl newPosition;
@@ -827,6 +829,11 @@ public class ManagedCursorImpl implements ManagedCursor {
}
@Override
+ public int getNonContiguousDeletedMessagesRangeSerializedSize() {
+ return this.individualDeletedMessagesSerializedSize;
+ }
+
+ @Override
public long getEstimatedSizeSinceMarkDeletePosition() {
return ledger.estimateBacklogFromPosition(markDeletePosition);
}
@@ -2421,6 +2428,7 @@ public class ManagedCursorImpl implements ManagedCursor {
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange.newBuilder();
+ AtomicInteger acksSerializedSize = new AtomicInteger(0);
List<MessageRange> rangeList = Lists.newArrayList();
individualDeletedMessages.forEach((positionRange) -> {
PositionImpl p = positionRange.lowerEndpoint();
@@ -2431,9 +2439,12 @@ public class ManagedCursorImpl implements ManagedCursor {
nestedPositionBuilder.setLedgerId(p.getLedgerId());
nestedPositionBuilder.setEntryId(p.getEntryId());
messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
- rangeList.add(messageRangeBuilder.build());
+ MessageRange messageRange = messageRangeBuilder.build();
+ acksSerializedSize.addAndGet(messageRange.getSerializedSize());
+ rangeList.add(messageRange);
return rangeList.size() <= config.getMaxUnackedRangesToPersist();
});
+ this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
return rangeList;
} finally {
lock.readLock().unlock();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 32e55de..5733671 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -298,6 +298,11 @@ public class ManagedCursorContainerTest {
}
@Override
+ public int getNonContiguousDeletedMessagesRangeSerializedSize() {
+ return 0;
+ }
+
+ @Override
public long getEstimatedSizeSinceMarkDeletePosition() {
return 0L;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 0a578d0..57b5bbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -941,6 +941,9 @@ public class PersistentSubscription implements Subscription {
});
}
}
+ subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
+ subStats.nonContiguousDeletedMessagesRangesSerializedSize =
+ cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
return subStats;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3de20c5..3b91cf80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1609,6 +1609,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
+ stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
+ stats.nonContiguousDeletedMessagesRangesSerializedSize +=
+ subStats.nonContiguousDeletedMessagesRangesSerializedSize;
});
replicators.forEach((cluster, replicator) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index e441e19..25b0861 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -22,17 +22,20 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
@Slf4j
public class SubscriptionStatsTest extends ProducerConsumerBase {
@@ -91,4 +94,44 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
consumer2.close();
producer.close();
}
+
+ @Test
+ public void testNonContiguousDeletedMessagesRanges() throws Exception {
+ final String topicName = "persistent://my-property/my-ns/testNonContiguousDeletedMessagesRanges-"
+ + UUID.randomUUID().toString();
+ final String subName = "my-sub";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+
+ final int messages = 100;
+ for (int i = 0; i < messages; i++) {
+ producer.send(String.valueOf(i).getBytes());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ Message<byte[]> received = consumer.receive();
+ if (i != 50) {
+ consumer.acknowledge(received);
+ }
+ }
+
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+ TopicStats stats = admin.topics().getStats(topicName);
+ Assert.assertEquals(stats.nonContiguousDeletedMessagesRanges, 1);
+ Assert.assertEquals(stats.subscriptions.size(), 1);
+ Assert.assertEquals(stats.subscriptions.get(subName).nonContiguousDeletedMessagesRanges, 1);
+ Assert.assertTrue(stats.nonContiguousDeletedMessagesRangesSerializedSize > 0);
+ Assert.assertTrue(stats.subscriptions.get(subName)
+ .nonContiguousDeletedMessagesRangesSerializedSize > 0);
+ });
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index e5e6feb..c3a09ab 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -98,6 +98,12 @@ public class SubscriptionStats {
/** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */
public Map<String, String> consumersAfterMarkDeletePosition;
+ /** The number of non-contiguous deleted messages ranges. */
+ public int nonContiguousDeletedMessagesRanges;
+
+ /** The serialized size of non-contiguous deleted messages ranges. */
+ public int nonContiguousDeletedMessagesRangesSerializedSize;
+
public SubscriptionStats() {
this.consumers = Lists.newArrayList();
this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
@@ -115,6 +121,9 @@ public class SubscriptionStats {
msgRateExpired = 0;
lastExpireTimestamp = 0L;
consumers.clear();
+ consumersAfterMarkDeletePosition.clear();
+ nonContiguousDeletedMessagesRanges = 0;
+ nonContiguousDeletedMessagesRangesSerializedSize = 0;
}
// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -143,6 +152,8 @@ public class SubscriptionStats {
}
}
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
+ this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
+ this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
return this;
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 8d560a9..bc6716b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -78,6 +78,12 @@ public class TopicStats {
public String deduplicationStatus;
+ /** The number of non-contiguous deleted messages ranges. */
+ public int nonContiguousDeletedMessagesRanges;
+
+ /** The serialized size of non-contiguous deleted messages ranges. */
+ public int nonContiguousDeletedMessagesRangesSerializedSize;
+
public TopicStats() {
this.publishers = Lists.newArrayList();
this.subscriptions = Maps.newHashMap();
@@ -101,6 +107,8 @@ public class TopicStats {
this.subscriptions.clear();
this.replication.clear();
this.deduplicationStatus = null;
+ this.nonContiguousDeletedMessagesRanges = 0;
+ this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
}
// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -120,6 +128,8 @@ public class TopicStats {
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
this.backlogSize += stats.backlogSize;
+ this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
+ this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
if (this.publishers.size() != stats.publishers.size()) {
for (int i = 0; i < stats.publishers.size(); i++) {
PublisherStats publisherStats = new PublisherStats();