You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/30 08:54:35 UTC

[pulsar] 02/04: [Admin] Expose offloaded storage size to the admin stats (#9335)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 41efa5d318b998ae4e39c2ecde3cbfd8b00f46bb
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Jan 29 03:08:57 2021 +0800

    [Admin] Expose offloaded storage size to the admin stats (#9335)
    
    *Motivation*
    
    Add offloaded storage size in the topic stats.
    
    (cherry picked from commit 2af0aedf4e11aa0b29a1706e48d1355196ab7eda)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
 .../java/org/apache/pulsar/broker/service/BrokerServiceTest.java     | 5 +++++
 .../main/java/org/apache/pulsar/common/policies/data/TopicStats.java | 5 +++++
 .../apache/pulsar/common/policies/data/PersistentTopicStatsTest.java | 3 +++
 4 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00dcd81..5ad3394 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1636,7 +1636,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.storageSize = ledger.getTotalSize();
         stats.backlogSize = ledger.getEstimatedBacklogSize();
         stats.deduplicationStatus = messageDeduplication.getStatus().toString();
-
+        stats.offloadedStorageSize = ledger.getOffloadedSize();
         return stats;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 97ddcca..5688f97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -169,6 +169,9 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(subStats.msgBacklog, 0);
         assertEquals(subStats.consumers.size(), 1);
 
+        // storage stats
+        assertEquals(stats.offloadedStorageSize, 0);
+
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
@@ -206,6 +209,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
         assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
         assertNotNull(subStats.consumers.get(0).getClientVersion());
+        assertEquals(stats.offloadedStorageSize, 0);
 
         Message<byte[]> msg;
         for (int i = 0; i < 10; i++) {
@@ -218,6 +222,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         rolloverPerIntervalStats();
         stats = topicRef.getStats(false);
         subStats = stats.subscriptions.values().iterator().next();
+        assertEquals(stats.offloadedStorageSize, 0);
 
         assertEquals(subStats.msgBacklog, 0);
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index bc6716b..763f315 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -67,6 +67,9 @@ public class TopicStats {
     /** Get estimated total unconsumed or backlog size in bytes. */
     public long backlogSize;
 
+    /** Space used to store the offloaded messages for the topic/. */
+    public long offloadedStorageSize;
+
     /** List of connected publishers on this topic w/ their stats. */
     public List<PublisherStats> publishers;
 
@@ -109,6 +112,7 @@ public class TopicStats {
         this.deduplicationStatus = null;
         this.nonContiguousDeletedMessagesRanges = 0;
         this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
+        this.offloadedStorageSize = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -128,6 +132,7 @@ public class TopicStats {
         this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
         this.backlogSize += stats.backlogSize;
+        this.offloadedStorageSize += stats.offloadedStorageSize;
         this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
         if (this.publishers.size() != stats.publishers.size()) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index cdbfcd5..dcfbd20 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -37,6 +37,7 @@ public class PersistentTopicStatsTest {
         topicStats.msgThroughputOut = 1;
         topicStats.averageMsgSize = 1;
         topicStats.storageSize = 1;
+        topicStats.offloadedStorageSize = 1;
         topicStats.publishers.add(new PublisherStats());
         topicStats.subscriptions.put("test_ns", new SubscriptionStats());
         topicStats.replication.put("test_ns", new ReplicatorStats());
@@ -47,6 +48,7 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.msgRateOut, 1.0);
         assertEquals(topicStats.msgThroughputOut, 1.0);
         assertEquals(topicStats.averageMsgSize, 1.0);
+        assertEquals(topicStats.offloadedStorageSize, 1);
         assertEquals(topicStats.storageSize, 1);
         assertEquals(topicStats.publishers.size(), 1);
         assertEquals(topicStats.subscriptions.size(), 1);
@@ -61,6 +63,7 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.publishers.size(), 0);
         assertEquals(topicStats.subscriptions.size(), 0);
         assertEquals(topicStats.replication.size(), 0);
+        assertEquals(topicStats.offloadedStorageSize, 0);
     }
 
     @Test