You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:21 UTC

[pulsar] 27/27: [Pulsar Admin] Expose schema ledger in `topic stats-internal` (#9284)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9cb6890591a6dbebf67f7cf0dfd3d337735a6a9c
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Feb 18 21:33:32 2021 +0800

    [Pulsar Admin] Expose schema ledger in `topic stats-internal` (#9284)
    
    ## Motivation
    Expose schema ledger in `topic stats-internal`
    
    (cherry picked from commit c754f9e90c4ec08a1be744d497ce81fa3dcfd568)
---
 .../broker/service/persistent/PersistentTopic.java |  67 ++++++++++-
 .../service/schema/BookkeeperSchemaStorage.java    |  24 ++++
 .../pulsar/broker/admin/AdminApiSchemaTest.java    | 125 +++++++++++++++++++++
 .../client/impl/BatchMessageIndexAckTest.java      |  99 ++++++++++++++++
 .../data/PersistentTopicInternalStats.java         |   1 +
 site2/docs/admin-api-topics.md                     |  96 ++++++++++------
 6 files changed, 375 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a27147e..89075df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -93,6 +94,7 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPolicyListener;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -1721,14 +1723,71 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             cs.properties = cursor.getProperties();
             stats.cursors.put(cursor.getName(), cs);
         });
-        if (futures != null) {
-            FutureUtil.waitForAll(futures).handle((res, ex) -> {
-                statFuture.complete(stats);
+
+        //Schema store ledgers
+        String schemaId;
+        try {
+            schemaId = TopicName.get(topic).getSchemaName();
+        } catch (Throwable t) {
+            statFuture.completeExceptionally(t);
+            return statFuture;
+        }
+
+
+        CompletableFuture<Void> schemaStoreLedgersFuture = new CompletableFuture<>();
+        stats.schemaLedgers = Collections.synchronizedList(new ArrayList<>());
+        if (brokerService.getPulsar().getSchemaStorage() != null
+                && brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage) {
+            ((BookkeeperSchemaStorage) brokerService.getPulsar().getSchemaStorage())
+                    .getStoreLedgerIdsBySchemaId(schemaId)
+                    .thenAccept(ledgers -> {
+                        List<CompletableFuture<Void>> getLedgerMetadataFutures = new ArrayList<>();
+                        ledgers.forEach(ledgerId -> {
+                            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                            getLedgerMetadataFutures.add(completableFuture);
+                            brokerService.getPulsar().getBookKeeperClient()
+                                    .getLedgerMetadata(ledgerId)
+                                    .thenAccept(metadata -> {
+                                        LedgerInfo schemaLedgerInfo = new LedgerInfo();
+                                        schemaLedgerInfo.ledgerId = metadata.getLedgerId();
+                                        schemaLedgerInfo.entries = metadata.getLastEntryId() + 1;
+                                        schemaLedgerInfo.size = metadata.getLength();
+                                        if (includeLedgerMetadata) {
+                                            info.metadata = metadata.toSafeString();
+                                        }
+                                        stats.schemaLedgers.add(schemaLedgerInfo);
+                                        completableFuture.complete(null);
+                                    }).exceptionally(e -> {
+                                completableFuture.completeExceptionally(e);
+                                return null;
+                            });
+                        });
+                        FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> {
+                            schemaStoreLedgersFuture.complete(null);
+                        }).exceptionally(e -> {
+                            schemaStoreLedgersFuture.completeExceptionally(e);
+                            return null;
+                        });
+                    }).exceptionally(e -> {
+                schemaStoreLedgersFuture.completeExceptionally(e);
                 return null;
             });
         } else {
-            statFuture.complete(stats);
+            schemaStoreLedgersFuture.complete(null);
         }
+        schemaStoreLedgersFuture.thenRun(() -> {
+            if (futures != null) {
+                FutureUtil.waitForAll(futures).handle((res, ex) -> {
+                    statFuture.complete(stats);
+                    return null;
+                });
+            } else {
+                statFuture.complete(stats);
+            }
+        }).exceptionally(e -> {
+            statFuture.completeExceptionally(e);
+            return null;
+        });
         return statFuture;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index db948c5..0911ac6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -33,9 +33,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -626,6 +628,28 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         return future;
     }
 
+    public CompletableFuture<List<Long>> getStoreLedgerIdsBySchemaId(String schemaId) {
+        CompletableFuture<List<Long>> ledgerIdsFuture = new CompletableFuture<>();
+        getSchemaLocator(getSchemaPath(schemaId)).thenAccept(locator -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Get all store schema ledgerIds - locator: {}", schemaId, locator);
+            }
+
+            if (!locator.isPresent()) {
+                ledgerIdsFuture.complete(Collections.emptyList());
+                return;
+            }
+            Set<Long> ledgerIds = new HashSet<>();
+            SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator;
+            schemaLocator.getIndexList().forEach(indexEntry -> ledgerIds.add(indexEntry.getPosition().getLedgerId()));
+            ledgerIdsFuture.complete(new ArrayList<>(ledgerIds));
+        }).exceptionally(e -> {
+            ledgerIdsFuture.completeExceptionally(e);
+            return null;
+        });
+        return ledgerIdsFuture;
+    }
+
     interface Functions {
         static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) {
             final CompletableFuture<LedgerEntry> future = new CompletableFuture<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index 140cd46..d4163b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -20,23 +20,38 @@ package org.apache.pulsar.broker.admin;
 
 import static java.nio.charset.StandardCharsets.US_ASCII;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Sets;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.StringSchema;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -213,4 +228,114 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(schemaInfo, keyValueSchema.getSchemaInfo());
     }
+
+    @Test
+    void getTopicIntervalStateIncludeSchemaStoreLedger() throws PulsarAdminException {
+        String topicName = "persistent://schematest/test/get-schema-ledger-info";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, "test", MessageId.earliest);
+        Schema<Foo> schema = Schema.AVRO(Foo.class);
+        admin.schemas().createSchema(topicName, schema.getSchemaInfo());
+        long ledgerId = 1;
+        long entryId = 10;
+        long length = 10;
+        doReturn(CompletableFuture.completedFuture(new LedgerMetadata() {
+            @Override
+            public long getLedgerId() {
+                return ledgerId;
+            }
+
+            @Override
+            public int getEnsembleSize() {
+                return 0;
+            }
+
+            @Override
+            public int getWriteQuorumSize() {
+                return 0;
+            }
+
+            @Override
+            public int getAckQuorumSize() {
+                return 0;
+            }
+
+            @Override
+            public long getLastEntryId() {
+                return entryId;
+            }
+
+            @Override
+            public long getLength() {
+                return length;
+            }
+
+            @Override
+            public boolean hasPassword() {
+                return false;
+            }
+
+            @Override
+            public byte[] getPassword() {
+                return new byte[0];
+            }
+
+            @Override
+            public DigestType getDigestType() {
+                return null;
+            }
+
+            @Override
+            public long getCtime() {
+                return 0;
+            }
+
+            @Override
+            public boolean isClosed() {
+                return false;
+            }
+
+            @Override
+            public Map<String, byte[]> getCustomMetadata() {
+                return null;
+            }
+
+            @Override
+            public List<BookieId> getEnsembleAt(long entryId) {
+                return null;
+            }
+
+            @Override
+            public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
+                return null;
+            }
+
+            @Override
+            public State getState() {
+                return null;
+            }
+
+            @Override
+            public String toSafeString() {
+                return "test";
+            }
+
+            @Override
+            public int getMetadataFormatVersion() {
+                return 0;
+            }
+
+            @Override
+            public long getCToken() {
+                return 0;
+            }
+        })).when(mockBookKeeper).getLedgerMetadata(anyLong());
+        PersistentTopicInternalStats persistentTopicInternalStats = admin.topics().getInternalStats(topicName);
+        List<PersistentTopicInternalStats.LedgerInfo> list = persistentTopicInternalStats.schemaLedgers;
+        assertEquals(1, list.size());
+        PersistentTopicInternalStats.LedgerInfo ledgerInfo = list.get(0);
+        assertEquals(ledgerId, ledgerInfo.ledgerId);
+        assertEquals(entryId + 1, ledgerInfo.entries);
+        assertEquals(length, ledgerInfo.size);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 6ff7d23..da76e73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.client.impl;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -37,10 +40,15 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+
 @Slf4j
 public class BatchMessageIndexAckTest extends ProducerConsumerBase {
 
@@ -50,6 +58,97 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
         conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
         super.internalSetup();
         super.producerBaseSetup();
+        doReturn(CompletableFuture.completedFuture(new LedgerMetadata() {
+            @Override
+            public long getLedgerId() {
+                return 0;
+            }
+
+            @Override
+            public int getEnsembleSize() {
+                return 0;
+            }
+
+            @Override
+            public int getWriteQuorumSize() {
+                return 0;
+            }
+
+            @Override
+            public int getAckQuorumSize() {
+                return 0;
+            }
+
+            @Override
+            public long getLastEntryId() {
+                return 0;
+            }
+
+            @Override
+            public long getLength() {
+                return 0;
+            }
+
+            @Override
+            public boolean hasPassword() {
+                return false;
+            }
+
+            @Override
+            public byte[] getPassword() {
+                return new byte[0];
+            }
+
+            @Override
+            public DigestType getDigestType() {
+                return null;
+            }
+
+            @Override
+            public long getCtime() {
+                return 0;
+            }
+
+            @Override
+            public boolean isClosed() {
+                return false;
+            }
+
+            @Override
+            public Map<String, byte[]> getCustomMetadata() {
+                return null;
+            }
+
+            @Override
+            public List<BookieId> getEnsembleAt(long entryId) {
+                return null;
+            }
+
+            @Override
+            public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
+                return null;
+            }
+
+            @Override
+            public State getState() {
+                return null;
+            }
+
+            @Override
+            public String toSafeString() {
+                return null;
+            }
+
+            @Override
+            public int getMetadataFormatVersion() {
+                return 0;
+            }
+
+            @Override
+            public long getCToken() {
+                return 0;
+            }
+        })).when(mockBookKeeper).getLedgerMetadata(anyLong());
     }
 
     @AfterMethod(alwaysRun = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index e527980..4bd61c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -43,6 +43,7 @@ public class PersistentTopicInternalStats {
 
     public List<LedgerInfo> ledgers;
     public Map<String, CursorStats> cursors;
+    public List<LedgerInfo> schemaLedgers;
 
     // LedgerInfo for compacted topic if exist.
     public LedgerInfo compactedLedger;
diff --git a/site2/docs/admin-api-topics.md b/site2/docs/admin-api-topics.md
index 5af61ad..6038a42 100644
--- a/site2/docs/admin-api-topics.md
+++ b/site2/docs/admin-api-topics.md
@@ -358,6 +358,20 @@ You can get the detailed statistics of a topic.
       -   **size**: The size of messages written to this ledger (in bytes).
 
       -   **offloaded**: Whether this ledger is offloaded.
+      
+      -   **metadata**: The ledger metadata.
+
+  -   **schemaLedgers**: The ordered list of all ledgers for this topic schema.
+  
+      -   **ledgerId**: The ID of this ledger.
+  
+      -   **entries**: The total number of entries belong to this ledger.
+  
+      -   **size**: The size of messages written to this ledger (in bytes).
+  
+      -   **offloaded**: Whether this ledger is offloaded.
+      
+      -   **metadata**: The ledger metadata.
 
   -   **compactedLedger**: The ledgers holding un-acked messages after topic compaction.
  
@@ -395,44 +409,60 @@ The following is an example of the detailed statistics of a topic.
 
 ```json
 {
-    "entriesAddedCounter": 20449518,
-    "numberOfEntries": 3233,
-    "totalSize": 331482,
-    "currentLedgerEntries": 3233,
-    "currentLedgerSize": 331482,
-    "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
-    "lastLedgerCreationFailureTimestamp": null,
-    "waitingCursorsCount": 1,
-    "pendingAddEntriesCount": 0,
-    "lastConfirmedEntry": "324711539:3232",
-    "state": "LedgerOpened",
-    "ledgers": [
+    "entriesAddedCounter":0,
+    "numberOfEntries":0,
+    "totalSize":0,
+    "currentLedgerEntries":0,
+    "currentLedgerSize":0,
+    "lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
+    "lastLedgerCreationFailureTimestamp":null,
+    "waitingCursorsCount":0,
+    "pendingAddEntriesCount":0,
+    "lastConfirmedEntry":"3:-1",
+    "state":"LedgerOpened",
+    "ledgers":[
         {
-            "ledgerId": 324711539,
-            "entries": 0,
-            "size": 0,
-            "offloaded": true
+            "ledgerId":3,
+            "entries":0,
+            "size":0,
+            "offloaded":false,
+            "metadata":null
         }
     ],
-    "compactedLedger": {
-        "ledgerId": 324711540,
-        "entries": 10,
-        "size": 100,
-        "offloaded": false
+    "cursors":{
+        "test":{
+            "markDeletePosition":"3:-1",
+            "readPosition":"3:-1",
+            "waitingReadOp":false,
+            "pendingReadOps":0,
+            "messagesConsumedCounter":0,
+            "cursorLedger":4,
+            "cursorLedgerLastEntry":1,
+            "individuallyDeletedMessages":"[]",
+            "lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
+            "state":"Open",
+            "numberOfEntriesSinceFirstNotAckedMessage":0,
+            "totalNonContiguousDeletedMessagesRange":0,
+            "properties":{
+
+            }
+        }
     },
-    "cursors": {
-        "my-subscription": {
-            "markDeletePosition": "324711539:3133",
-            "readPosition": "324711539:3233",
-            "waitingReadOp": true,
-            "pendingReadOps": 0,
-            "messagesConsumedCounter": 20449501,
-            "cursorLedger": 324702104,
-            "cursorLedgerLastEntry": 21,
-            "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
-            "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
-            "state": "Open"
+    "schemaLedgers":[
+        {
+            "ledgerId":1,
+            "entries":11,
+            "size":10,
+            "offloaded":false,
+            "metadata":null
         }
+    ],
+    "compactedLedger":{
+        "ledgerId":-1,
+        "entries":-1,
+        "size":-1,
+        "offloaded":false,
+        "metadata":null
     }
 }
 ```