You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:21 UTC
[pulsar] 27/27: [Pulsar Admin] Expose schema ledger in `topic
stats-internal` (#9284)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9cb6890591a6dbebf67f7cf0dfd3d337735a6a9c
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Feb 18 21:33:32 2021 +0800
[Pulsar Admin] Expose schema ledger in `topic stats-internal` (#9284)
## Motivation
Expose schema ledger in `topic stats-internal`
(cherry picked from commit c754f9e90c4ec08a1be744d497ce81fa3dcfd568)
---
.../broker/service/persistent/PersistentTopic.java | 67 ++++++++++-
.../service/schema/BookkeeperSchemaStorage.java | 24 ++++
.../pulsar/broker/admin/AdminApiSchemaTest.java | 125 +++++++++++++++++++++
.../client/impl/BatchMessageIndexAckTest.java | 99 ++++++++++++++++
.../data/PersistentTopicInternalStats.java | 1 +
site2/docs/admin-api-topics.md | 96 ++++++++++------
6 files changed, 375 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a27147e..89075df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -93,6 +94,7 @@ import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -1721,14 +1723,71 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
cs.properties = cursor.getProperties();
stats.cursors.put(cursor.getName(), cs);
});
- if (futures != null) {
- FutureUtil.waitForAll(futures).handle((res, ex) -> {
- statFuture.complete(stats);
+
+ //Schema store ledgers
+ String schemaId;
+ try {
+ schemaId = TopicName.get(topic).getSchemaName();
+ } catch (Throwable t) {
+ statFuture.completeExceptionally(t);
+ return statFuture;
+ }
+
+
+ CompletableFuture<Void> schemaStoreLedgersFuture = new CompletableFuture<>();
+ stats.schemaLedgers = Collections.synchronizedList(new ArrayList<>());
+ if (brokerService.getPulsar().getSchemaStorage() != null
+ && brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage) {
+ ((BookkeeperSchemaStorage) brokerService.getPulsar().getSchemaStorage())
+ .getStoreLedgerIdsBySchemaId(schemaId)
+ .thenAccept(ledgers -> {
+ List<CompletableFuture<Void>> getLedgerMetadataFutures = new ArrayList<>();
+ ledgers.forEach(ledgerId -> {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ getLedgerMetadataFutures.add(completableFuture);
+ brokerService.getPulsar().getBookKeeperClient()
+ .getLedgerMetadata(ledgerId)
+ .thenAccept(metadata -> {
+ LedgerInfo schemaLedgerInfo = new LedgerInfo();
+ schemaLedgerInfo.ledgerId = metadata.getLedgerId();
+ schemaLedgerInfo.entries = metadata.getLastEntryId() + 1;
+ schemaLedgerInfo.size = metadata.getLength();
+ if (includeLedgerMetadata) {
+ info.metadata = metadata.toSafeString();
+ }
+ stats.schemaLedgers.add(schemaLedgerInfo);
+ completableFuture.complete(null);
+ }).exceptionally(e -> {
+ completableFuture.completeExceptionally(e);
+ return null;
+ });
+ });
+ FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> {
+ schemaStoreLedgersFuture.complete(null);
+ }).exceptionally(e -> {
+ schemaStoreLedgersFuture.completeExceptionally(e);
+ return null;
+ });
+ }).exceptionally(e -> {
+ schemaStoreLedgersFuture.completeExceptionally(e);
return null;
});
} else {
- statFuture.complete(stats);
+ schemaStoreLedgersFuture.complete(null);
}
+ schemaStoreLedgersFuture.thenRun(() -> {
+ if (futures != null) {
+ FutureUtil.waitForAll(futures).handle((res, ex) -> {
+ statFuture.complete(stats);
+ return null;
+ });
+ } else {
+ statFuture.complete(stats);
+ }
+ }).exceptionally(e -> {
+ statFuture.completeExceptionally(e);
+ return null;
+ });
return statFuture;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index db948c5..0911ac6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -33,9 +33,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -626,6 +628,28 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
return future;
}
+ public CompletableFuture<List<Long>> getStoreLedgerIdsBySchemaId(String schemaId) {
+ CompletableFuture<List<Long>> ledgerIdsFuture = new CompletableFuture<>();
+ getSchemaLocator(getSchemaPath(schemaId)).thenAccept(locator -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Get all store schema ledgerIds - locator: {}", schemaId, locator);
+ }
+
+ if (!locator.isPresent()) {
+ ledgerIdsFuture.complete(Collections.emptyList());
+ return;
+ }
+ Set<Long> ledgerIds = new HashSet<>();
+ SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator;
+ schemaLocator.getIndexList().forEach(indexEntry -> ledgerIds.add(indexEntry.getPosition().getLedgerId()));
+ ledgerIdsFuture.complete(new ArrayList<>(ledgerIds));
+ }).exceptionally(e -> {
+ ledgerIdsFuture.completeExceptionally(e);
+ return null;
+ });
+ return ledgerIdsFuture;
+ }
+
interface Functions {
static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) {
final CompletableFuture<LedgerEntry> future = new CompletableFuture<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index 140cd46..d4163b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -20,23 +20,38 @@ package org.apache.pulsar.broker.admin;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -213,4 +228,114 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
assertEquals(schemaInfo, keyValueSchema.getSchemaInfo());
}
+
+ @Test
+ void getTopicIntervalStateIncludeSchemaStoreLedger() throws PulsarAdminException {
+ String topicName = "persistent://schematest/test/get-schema-ledger-info";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, "test", MessageId.earliest);
+ Schema<Foo> schema = Schema.AVRO(Foo.class);
+ admin.schemas().createSchema(topicName, schema.getSchemaInfo());
+ long ledgerId = 1;
+ long entryId = 10;
+ long length = 10;
+ doReturn(CompletableFuture.completedFuture(new LedgerMetadata() {
+ @Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ @Override
+ public int getEnsembleSize() {
+ return 0;
+ }
+
+ @Override
+ public int getWriteQuorumSize() {
+ return 0;
+ }
+
+ @Override
+ public int getAckQuorumSize() {
+ return 0;
+ }
+
+ @Override
+ public long getLastEntryId() {
+ return entryId;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public boolean hasPassword() {
+ return false;
+ }
+
+ @Override
+ public byte[] getPassword() {
+ return new byte[0];
+ }
+
+ @Override
+ public DigestType getDigestType() {
+ return null;
+ }
+
+ @Override
+ public long getCtime() {
+ return 0;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public Map<String, byte[]> getCustomMetadata() {
+ return null;
+ }
+
+ @Override
+ public List<BookieId> getEnsembleAt(long entryId) {
+ return null;
+ }
+
+ @Override
+ public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
+ return null;
+ }
+
+ @Override
+ public State getState() {
+ return null;
+ }
+
+ @Override
+ public String toSafeString() {
+ return "test";
+ }
+
+ @Override
+ public int getMetadataFormatVersion() {
+ return 0;
+ }
+
+ @Override
+ public long getCToken() {
+ return 0;
+ }
+ })).when(mockBookKeeper).getLedgerMetadata(anyLong());
+ PersistentTopicInternalStats persistentTopicInternalStats = admin.topics().getInternalStats(topicName);
+ List<PersistentTopicInternalStats.LedgerInfo> list = persistentTopicInternalStats.schemaLedgers;
+ assertEquals(1, list.size());
+ PersistentTopicInternalStats.LedgerInfo ledgerInfo = list.get(0);
+ assertEquals(ledgerId, ledgerInfo.ledgerId);
+ assertEquals(entryId + 1, ledgerInfo.entries);
+ assertEquals(length, ledgerInfo.size);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 6ff7d23..da76e73 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.client.impl;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -37,10 +40,15 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+
@Slf4j
public class BatchMessageIndexAckTest extends ProducerConsumerBase {
@@ -50,6 +58,97 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
super.internalSetup();
super.producerBaseSetup();
+ doReturn(CompletableFuture.completedFuture(new LedgerMetadata() {
+ @Override
+ public long getLedgerId() {
+ return 0;
+ }
+
+ @Override
+ public int getEnsembleSize() {
+ return 0;
+ }
+
+ @Override
+ public int getWriteQuorumSize() {
+ return 0;
+ }
+
+ @Override
+ public int getAckQuorumSize() {
+ return 0;
+ }
+
+ @Override
+ public long getLastEntryId() {
+ return 0;
+ }
+
+ @Override
+ public long getLength() {
+ return 0;
+ }
+
+ @Override
+ public boolean hasPassword() {
+ return false;
+ }
+
+ @Override
+ public byte[] getPassword() {
+ return new byte[0];
+ }
+
+ @Override
+ public DigestType getDigestType() {
+ return null;
+ }
+
+ @Override
+ public long getCtime() {
+ return 0;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public Map<String, byte[]> getCustomMetadata() {
+ return null;
+ }
+
+ @Override
+ public List<BookieId> getEnsembleAt(long entryId) {
+ return null;
+ }
+
+ @Override
+ public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
+ return null;
+ }
+
+ @Override
+ public State getState() {
+ return null;
+ }
+
+ @Override
+ public String toSafeString() {
+ return null;
+ }
+
+ @Override
+ public int getMetadataFormatVersion() {
+ return 0;
+ }
+
+ @Override
+ public long getCToken() {
+ return 0;
+ }
+ })).when(mockBookKeeper).getLedgerMetadata(anyLong());
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index e527980..4bd61c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -43,6 +43,7 @@ public class PersistentTopicInternalStats {
public List<LedgerInfo> ledgers;
public Map<String, CursorStats> cursors;
+ public List<LedgerInfo> schemaLedgers;
// LedgerInfo for compacted topic if exist.
public LedgerInfo compactedLedger;
diff --git a/site2/docs/admin-api-topics.md b/site2/docs/admin-api-topics.md
index 5af61ad..6038a42 100644
--- a/site2/docs/admin-api-topics.md
+++ b/site2/docs/admin-api-topics.md
@@ -358,6 +358,20 @@ You can get the detailed statistics of a topic.
- **size**: The size of messages written to this ledger (in bytes).
- **offloaded**: Whether this ledger is offloaded.
+
+ - **metadata**: The ledger metadata.
+
+ - **schemaLedgers**: The ordered list of all ledgers for this topic schema.
+
+ - **ledgerId**: The ID of this ledger.
+
+ - **entries**: The total number of entries belong to this ledger.
+
+ - **size**: The size of messages written to this ledger (in bytes).
+
+ - **offloaded**: Whether this ledger is offloaded.
+
+ - **metadata**: The ledger metadata.
- **compactedLedger**: The ledgers holding un-acked messages after topic compaction.
@@ -395,44 +409,60 @@ The following is an example of the detailed statistics of a topic.
```json
{
- "entriesAddedCounter": 20449518,
- "numberOfEntries": 3233,
- "totalSize": 331482,
- "currentLedgerEntries": 3233,
- "currentLedgerSize": 331482,
- "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
- "lastLedgerCreationFailureTimestamp": null,
- "waitingCursorsCount": 1,
- "pendingAddEntriesCount": 0,
- "lastConfirmedEntry": "324711539:3232",
- "state": "LedgerOpened",
- "ledgers": [
+ "entriesAddedCounter":0,
+ "numberOfEntries":0,
+ "totalSize":0,
+ "currentLedgerEntries":0,
+ "currentLedgerSize":0,
+ "lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00",
+ "lastLedgerCreationFailureTimestamp":null,
+ "waitingCursorsCount":0,
+ "pendingAddEntriesCount":0,
+ "lastConfirmedEntry":"3:-1",
+ "state":"LedgerOpened",
+ "ledgers":[
{
- "ledgerId": 324711539,
- "entries": 0,
- "size": 0,
- "offloaded": true
+ "ledgerId":3,
+ "entries":0,
+ "size":0,
+ "offloaded":false,
+ "metadata":null
}
],
- "compactedLedger": {
- "ledgerId": 324711540,
- "entries": 10,
- "size": 100,
- "offloaded": false
+ "cursors":{
+ "test":{
+ "markDeletePosition":"3:-1",
+ "readPosition":"3:-1",
+ "waitingReadOp":false,
+ "pendingReadOps":0,
+ "messagesConsumedCounter":0,
+ "cursorLedger":4,
+ "cursorLedgerLastEntry":1,
+ "individuallyDeletedMessages":"[]",
+ "lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00",
+ "state":"Open",
+ "numberOfEntriesSinceFirstNotAckedMessage":0,
+ "totalNonContiguousDeletedMessagesRange":0,
+ "properties":{
+
+ }
+ }
},
- "cursors": {
- "my-subscription": {
- "markDeletePosition": "324711539:3133",
- "readPosition": "324711539:3233",
- "waitingReadOp": true,
- "pendingReadOps": 0,
- "messagesConsumedCounter": 20449501,
- "cursorLedger": 324702104,
- "cursorLedgerLastEntry": 21,
- "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
- "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
- "state": "Open"
+ "schemaLedgers":[
+ {
+ "ledgerId":1,
+ "entries":11,
+ "size":10,
+ "offloaded":false,
+ "metadata":null
}
+ ],
+ "compactedLedger":{
+ "ledgerId":-1,
+ "entries":-1,
+ "size":-1,
+ "offloaded":false,
+ "metadata":null
}
}
```