You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/09/08 00:13:50 UTC

[pulsar] branch master updated: Report compacted topic ledger info when calling get internal stats. (#7988)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 97ba09e  Report compacted topic ledger info when calling get internal stats. (#7988)
97ba09e is described below

commit 97ba09e63be256412ff013611dca1f000cff9b9a
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Mon Sep 7 17:13:25 2020 -0700

    Report compacted topic ledger info when calling get internal stats. (#7988)
    
    Fixes #7895
    
    ### Motivation
    For get-internal-stats of persistent topic admin cli: https://pulsar.apache.org/docs/en/2.6.0/admin-api-persistent-topics/#get-internal-stats, we can also return the compacted topic ledger id if compaction is enabled. So we'll able to read from ledger without creating additional subscription, it can benefit like querying compacted topic from Pulsar SQL.
    
    ### Modifications
    Expose CompactedTopicContext from CompactedTopicImpl, try to get ledger information of compacted topic ledger if exist in PersistentTopic.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    - Added unit test to verify correct compacted ledger info is returned after compaction.
    
    
    * Report compacted topic ledger info when calling get internal stats.
    
    * Update documentation to add information about returning compacted topic ledger when get-internal-stats.
---
 .../broker/service/persistent/PersistentTopic.java  | 21 +++++++++++++++++++++
 .../pulsar/compaction/CompactedTopicImpl.java       | 14 +++++++++++++-
 .../apache/pulsar/compaction/CompactionTest.java    |  7 +++++++
 .../policies/data/PersistentTopicInternalStats.java |  3 +++
 site2/docs/admin-api-persistent-topics.md           | 19 ++++++++++++++++++-
 5 files changed, 62 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8ff51f6..becd094 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -1605,6 +1606,26 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             stats.ledgers.add(info);
         });
 
+        // Add ledger info for compacted topic ledger if exist.
+        LedgerInfo info = new LedgerInfo();
+        info.ledgerId = -1;
+        info.entries = -1;
+        info.size = -1;
+
+        try {
+            Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext =
+                    ((CompactedTopicImpl)compactedTopic).getCompactedTopicContext();
+            if (compactedTopicContext.isPresent()) {
+                CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
+                info.ledgerId = ledgerContext.getLedger().getId();
+                info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
+                info.size = ledgerContext.getLedger().getLength();
+            }
+        } catch (ExecutionException | InterruptedException e) {
+            log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
+        }
+        stats.compactedLedger = info;
+
         stats.cursors = Maps.newTreeMap();
         ml.getCursors().forEach(c -> {
             ManagedCursorImpl cursor = (ManagedCursorImpl) c;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 2d430a7..74d24c6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -28,8 +28,11 @@ import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
+import lombok.Getter;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -250,7 +253,8 @@ public class CompactedTopicImpl implements CompactedTopic {
                 });
     }
 
-    static class CompactedTopicContext {
+    @Getter
+    public static class CompactedTopicContext {
         final LedgerHandle ledger;
         final AsyncLoadingCache<Long,MessageIdData> cache;
 
@@ -260,6 +264,14 @@ public class CompactedTopicImpl implements CompactedTopic {
         }
     }
 
+    /**
+     * Getter for CompactedTopicContext.
+     * @return CompactedTopicContext
+     */
+    public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException {
+        return compactedTopicContext == null? Optional.empty() : Optional.of(compactedTopicContext.get());
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 2505a6e..4ef450d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -132,6 +133,12 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
         compactor.compact(topic).get();
 
+        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic);
+        // Compacted topic ledger should have same number of entry equals to number of unique key.
+        Assert.assertEquals(expected.size(), internalStats.compactedLedger.entries);
+        Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+        Assert.assertFalse(internalStats.compactedLedger.offloaded);
+
         // consumer with readCompacted enabled only get compacted entries
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index aa9c595..63fd3cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -44,6 +44,9 @@ public class PersistentTopicInternalStats {
     public List<LedgerInfo> ledgers;
     public Map<String, CursorStats> cursors;
 
+    // LedgerInfo for compacted topic if exist.
+    public LedgerInfo compactedLedger;
+
     /**
      * Ledger information.
      */
diff --git a/site2/docs/admin-api-persistent-topics.md b/site2/docs/admin-api-persistent-topics.md
index 340e7c7..df950f9 100644
--- a/site2/docs/admin-api-persistent-topics.md
+++ b/site2/docs/admin-api-persistent-topics.md
@@ -364,6 +364,16 @@ It shows detailed statistics of a topic.
 
       -   **offloaded**: Whether this ledger is offloaded
 
+  -   **compactedLedger**: The ledgers holding un-acked messages after topic compaction.
+ 
+      -   **ledgerId**: Id of this ledger
+     
+      -   **entries**: Total number of entries belong to this ledger
+     
+      -   **size**: Size of messages written to this ledger (in bytes)
+     
+      -   **offloaded**: Will always be false for compacted topic ledger.
+      
   -   **cursors**: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.
 
       -   **markDeletePosition**: All of messages before the markDeletePosition are acknowledged by the subscriber.
@@ -403,9 +413,16 @@ It shows detailed statistics of a topic.
         {
             "ledgerId": 324711539,
             "entries": 0,
-            "size": 0
+            "size": 0,
+            "offloaded": true
         }
     ],
+    "compactedLedger": {
+        "ledgerId": 324711540,
+        "entries": 10,
+        "size": 100,
+        "offloaded": false
+    },
     "cursors": {
         "my-subscription": {
             "markDeletePosition": "324711539:3133",