You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/30 08:54:35 UTC
[pulsar] 02/04: [Admin] Expose offloaded storage size to the admin
stats (#9335)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 41efa5d318b998ae4e39c2ecde3cbfd8b00f46bb
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Jan 29 03:08:57 2021 +0800
[Admin] Expose offloaded storage size to the admin stats (#9335)
*Motivation*
Add offloaded storage size in the topic stats.
(cherry picked from commit 2af0aedf4e11aa0b29a1706e48d1355196ab7eda)
---
.../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
.../java/org/apache/pulsar/broker/service/BrokerServiceTest.java | 5 +++++
.../main/java/org/apache/pulsar/common/policies/data/TopicStats.java | 5 +++++
.../apache/pulsar/common/policies/data/PersistentTopicStatsTest.java | 3 +++
4 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00dcd81..5ad3394 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1636,7 +1636,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.storageSize = ledger.getTotalSize();
stats.backlogSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
-
+ stats.offloadedStorageSize = ledger.getOffloadedSize();
return stats;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 97ddcca..5688f97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -169,6 +169,9 @@ public class BrokerServiceTest extends BrokerTestBase {
assertEquals(subStats.msgBacklog, 0);
assertEquals(subStats.consumers.size(), 1);
+ // storage stats
+ assertEquals(stats.offloadedStorageSize, 0);
+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -206,6 +209,7 @@ public class BrokerServiceTest extends BrokerTestBase {
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertNotNull(subStats.consumers.get(0).getClientVersion());
+ assertEquals(stats.offloadedStorageSize, 0);
Message<byte[]> msg;
for (int i = 0; i < 10; i++) {
@@ -218,6 +222,7 @@ public class BrokerServiceTest extends BrokerTestBase {
rolloverPerIntervalStats();
stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
+ assertEquals(stats.offloadedStorageSize, 0);
assertEquals(subStats.msgBacklog, 0);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index bc6716b..763f315 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -67,6 +67,9 @@ public class TopicStats {
/** Get estimated total unconsumed or backlog size in bytes. */
public long backlogSize;
+ /** Space used to store the offloaded messages for the topic/. */
+ public long offloadedStorageSize;
+
/** List of connected publishers on this topic w/ their stats. */
public List<PublisherStats> publishers;
@@ -109,6 +112,7 @@ public class TopicStats {
this.deduplicationStatus = null;
this.nonContiguousDeletedMessagesRanges = 0;
this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
+ this.offloadedStorageSize = 0;
}
// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -128,6 +132,7 @@ public class TopicStats {
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
this.backlogSize += stats.backlogSize;
+ this.offloadedStorageSize += stats.offloadedStorageSize;
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
if (this.publishers.size() != stats.publishers.size()) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index cdbfcd5..dcfbd20 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -37,6 +37,7 @@ public class PersistentTopicStatsTest {
topicStats.msgThroughputOut = 1;
topicStats.averageMsgSize = 1;
topicStats.storageSize = 1;
+ topicStats.offloadedStorageSize = 1;
topicStats.publishers.add(new PublisherStats());
topicStats.subscriptions.put("test_ns", new SubscriptionStats());
topicStats.replication.put("test_ns", new ReplicatorStats());
@@ -47,6 +48,7 @@ public class PersistentTopicStatsTest {
assertEquals(topicStats.msgRateOut, 1.0);
assertEquals(topicStats.msgThroughputOut, 1.0);
assertEquals(topicStats.averageMsgSize, 1.0);
+ assertEquals(topicStats.offloadedStorageSize, 1);
assertEquals(topicStats.storageSize, 1);
assertEquals(topicStats.publishers.size(), 1);
assertEquals(topicStats.subscriptions.size(), 1);
@@ -61,6 +63,7 @@ public class PersistentTopicStatsTest {
assertEquals(topicStats.publishers.size(), 0);
assertEquals(topicStats.subscriptions.size(), 0);
assertEquals(topicStats.replication.size(), 0);
+ assertEquals(topicStats.offloadedStorageSize, 0);
}
@Test