You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/29 09:08:47 UTC

[pulsar] branch branch-2.7 updated (c941860 -> af6c2fa)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from c941860  [Security] Upgrade commons-io to address CVE-2021-29425 (#10287)
     new 26c2837  Fix schema ledger deletion when deleting topic with delete schema. (#10383)
     new af6c2fa  Add underReplicate state in the topic internal stats (#10013)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 ++++
 .../org/apache/pulsar/broker/PulsarService.java    |  7 +-
 .../broker/cache/LocalZooKeeperCacheService.java   | 10 ++-
 .../broker/service/persistent/PersistentTopic.java | 43 +++++++----
 .../service/schema/BookkeeperSchemaStorage.java    | 77 ++++++++++----------
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 83 +++++++++++++++++++---
 .../stats/client/PulsarBrokerStatsClientTest.java  | 22 +++---
 .../data/PersistentTopicInternalStats.java         |  1 +
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |  4 ++
 .../pulsar/tests/integration/admin/AdminTest.java  | 75 +++++++++++++++++++
 .../src/test/resources/pulsar-messaging.xml        |  1 +
 11 files changed, 267 insertions(+), 71 deletions(-)
 create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java

[pulsar] 01/02: Fix schema ledger deletion when deleting topic with delete schema. (#10383)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 26c2837cb9e374942494e7f32f2d4d8f6df480f6
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Apr 27 23:57:34 2021 +0800

    Fix schema ledger deletion when deleting topic with delete schema. (#10383)
    
    * Fix schema ledger deletion when delete topic with delete schema.
    
    * Revert public
    
    * Apply comments.
    
    * Apply comment.
    
    * Fix checkstyle.
    
    * Fix test
    
    (cherry picked from commit a22782490bb9a17411b749326d1f084b096998c8)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 77 ++++++++++----------
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 83 +++++++++++++++++++---
 2 files changed, 114 insertions(+), 46 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 2ecc927..521db90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -41,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 
@@ -83,10 +82,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
-    // schemaId => ledgers of the schemaId
-    private final Map<String, List<Long>> schemaLedgers = new ConcurrentHashMap<>();
-
-    private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations =
+            new ConcurrentHashMap<>();
 
     @VisibleForTesting
     BookkeeperSchemaStorage(PulsarService pulsar) {
@@ -160,7 +157,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         return result;
     }
 
-    private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
+    CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
         return getSchemaLocator(getSchemaPath(key));
     }
 
@@ -168,8 +165,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         localZkCache.invalidate(getSchemaPath(key));
     }
 
-    @VisibleForTesting
-    List<Long> getSchemaLedgerList(String key) throws IOException {
+    public List<Long> getSchemaLedgerList(String key) throws IOException {
         Optional<LocatorEntry> locatorEntry = null;
         try {
             locatorEntry = getLocator(key).get();
@@ -390,33 +386,44 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
             } else {
                 // The version is only for the compatibility of the current interface
                 final long version = -1;
-                final List<Long> ledgerIds = schemaLedgers.get(schemaId);
-                if (ledgerIds != null) {
-                    CompletableFuture<Long> future = new CompletableFuture<>();
-                    final AtomicInteger numOfLedgerIds = new AtomicInteger(ledgerIds.size());
-                    for (long ledgerId : ledgerIds) {
-                        bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
-                            if (rc != BKException.Code.OK) {
-                                // It's not a serious error, we didn't need call future.completeExceptionally()
-                                log.warn("Failed to delete ledger {} of {}: {}", ledgerId, schemaId, rc);
-                            }
-                            if (numOfLedgerIds.decrementAndGet() == 0) {
-                                try {
-                                    ZkUtils.deleteFullPathOptimistic(zooKeeper, getSchemaPath(schemaId), -1);
-                                } catch (InterruptedException | KeeperException e) {
-                                    future.completeExceptionally(e);
+                CompletableFuture<Long> future = new CompletableFuture<>();
+                getLocator(schemaId).whenComplete((locator, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(ex);
+                    } else {
+                        if (!locator.isPresent()) {
+                            future.complete(null);
+                            return;
+                        }
+                        List<SchemaStorageFormat.IndexEntry> indexEntryList = locator.get().locator.getIndexList();
+                        List<CompletableFuture<Void>> deleteFutures = new ArrayList<>(indexEntryList.size());
+                        indexEntryList.forEach(indexEntry -> {
+                            final long ledgerId = indexEntry.getPosition().getLedgerId();
+                            CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+                            deleteFutures.add(deleteFuture);
+                            bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
+                                if (rc != BKException.Code.OK) {
+                                    // It's not a serious error, we didn't need call future.completeExceptionally()
+                                    log.warn("Failed to delete ledger {} of {}: {}", ledgerId, schemaId, rc);
                                 }
-                                clearLocatorCache(getSchemaPath(schemaId));
-                                future.complete(version);
-                            }
-                        }, null);
+                                deleteFuture.complete(null);
+                            }, null);
+                        });
+                        FutureUtil.waitForAll(deleteFutures).whenComplete((v, e) -> {
+                            final String path = getSchemaPath(schemaId);
+                            ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, path, -1, (rc, path1, ctx) -> {
+                                if (rc != Code.OK.intValue()) {
+                                    future.completeExceptionally(KeeperException.create(Code.get(rc)));
+                                } else {
+                                    clearLocatorCache(getSchemaPath(schemaId));
+                                    future.complete(version);
+                                }
+                            }, path);
+
+                        });
                     }
-                    return future;
-                } else {
-                    // It should never reach here
-                    log.warn("No ledgers for schema id: {}", schemaId);
-                    return completedFuture(version);
-                }
+                });
+                return future;
             }
         });
     }
@@ -578,10 +585,6 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                         if (rc != BKException.Code.OK) {
                             future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
                         } else {
-                            schemaLedgers.computeIfAbsent(
-                                    schemaId,
-                                    key -> Collections.synchronizedList(new ArrayList<>())
-                            ).add(handle.getId());
                             future.complete(handle);
                         }
                     }, null, metadata);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 595da2b..d72cbf1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -20,13 +20,23 @@ package org.apache.pulsar.schema;
 
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
 import java.util.Collections;
-
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -39,14 +49,11 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
-import lombok.extern.slf4j.Slf4j;
-
 @Slf4j
 public class SchemaTest extends MockedPulsarServiceBaseTest {
 
@@ -206,4 +213,62 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test
+    public void testDeleteTopicAndSchema() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicName = "test-delete-topic-and-schema";
+
+        final String topic = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicName).toString();
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME));
+
+        @Cleanup
+        Producer<Schemas.PersonOne> p1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> p2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic)
+                .create();
+
+        List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures =
+                this.pulsar.getSchemaRegistryService().getAllSchemas(TopicName.get(topic).getSchemaName()).get();
+        FutureUtil.waitForAll(schemaFutures).get();
+        List<SchemaRegistry.SchemaAndMetadata> schemas = schemaFutures.stream().map(future -> {
+            try {
+                return future.get();
+            } catch (Exception e) {
+                return null;
+            }
+        }).collect(Collectors.toList());
+
+        assertEquals(schemas.size(), 2);
+        for (SchemaRegistry.SchemaAndMetadata schema : schemas) {
+            assertNotNull(schema);
+        }
+
+        List<Long> ledgers = ((BookkeeperSchemaStorage)this.pulsar.getSchemaStorage())
+                .getSchemaLedgerList(TopicName.get(topic).getSchemaName());
+        assertEquals(ledgers.size(), 2);
+        admin.topics().delete(topic, true, true);
+        assertEquals(this.pulsar.getSchemaRegistryService()
+                .trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0);
+
+        for (Long ledger : ledgers) {
+            try {
+                pulsar.getBookKeeperClient().openLedger(ledger, BookKeeper.DigestType.CRC32, new byte[]{});
+                fail();
+            } catch (BKException.BKNoSuchLedgerExistsException ignore) {
+            }
+        }
+    }
 }

[pulsar] 02/02: Add underReplicate state in the topic internal stats (#10013)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af6c2fa5659185193fa4af413aa5cea0007b81cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Apr 22 01:04:52 2021 +0800

    Add underReplicate state in the topic internal stats (#10013)
    
    * Add underReplicate state in the topic internal stats
    
    * Apply comments.
    
    (cherry picked from commit bfba8c8597cfcf0013a2ae1a6490dfd48f15fa76)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++
 .../org/apache/pulsar/broker/PulsarService.java    |  7 +-
 .../broker/cache/LocalZooKeeperCacheService.java   | 10 ++-
 .../broker/service/persistent/PersistentTopic.java | 43 +++++++++----
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  2 +-
 .../stats/client/PulsarBrokerStatsClientTest.java  | 22 ++++---
 .../data/PersistentTopicInternalStats.java         |  1 +
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |  4 ++
 .../pulsar/tests/integration/admin/AdminTest.java  | 75 ++++++++++++++++++++++
 .../src/test/resources/pulsar-messaging.xml        |  1 +
 10 files changed, 154 insertions(+), 26 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 66d4c71..c0a7dd4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -39,6 +39,7 @@ import io.netty.util.Recycler.Handle;
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -119,6 +121,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
 import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -3532,4 +3535,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
+        LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+        if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) {
+            return CompletableFuture.completedFuture(Collections.emptySet());
+        }
+
+        return getLedgerHandle(ledgerId).thenCompose(lh -> {
+            Set<BookieId> ensembles = new HashSet<>();
+            lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll);
+            return CompletableFuture.completedFuture(ensembles);
+        });
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ef8df6f..5c79909 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -43,8 +43,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -1366,4 +1367,8 @@ public class PulsarService implements AutoCloseable {
     public Optional<Integer> getBrokerListenPortTls() {
         return brokerService.getListenPortTls();
     }
+
+    public CompletableFuture<Set<String>> getAvailableBookiesAsync() {
+        return this.localZkCacheService.availableBookiesCache().getAsync();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 48b3f88..cf21f02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -34,8 +34,9 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
+import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -50,6 +51,7 @@ public class LocalZooKeeperCacheService {
     private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers";
     public static final String OWNER_INFO_ROOT = "/namespace";
     public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
+    public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available";
 
     private final ZooKeeperCache cache;
 
@@ -57,6 +59,7 @@ public class LocalZooKeeperCacheService {
     private ZooKeeperManagedLedgerCache managedLedgerListCache;
     private ResourceQuotaCache resourceQuotaCache;
     private ZooKeeperDataCache<LocalPolicies> policiesCache;
+    private ZooKeeperChildrenCache availableBookiesCache;
 
     private ConfigurationCacheService configurationCacheService;
 
@@ -121,6 +124,7 @@ public class LocalZooKeeperCacheService {
         this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
         this.resourceQuotaCache = new ResourceQuotaCache(cache);
         this.resourceQuotaCache.initZK();
+        this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT);
     }
 
     private void initZK() throws PulsarServerException {
@@ -248,6 +252,10 @@ public class LocalZooKeeperCacheService {
         return this.managedLedgerListCache;
     }
 
+    public ZooKeeperChildrenCache availableBookiesCache() {
+        return this.availableBookiesCache;
+    }
+
     public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
         return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 85ddafb..d301443 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -65,6 +66,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -1673,20 +1675,35 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         stats.ledgers = Lists.newArrayList();
         List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
-        ml.getLedgersInfo().forEach((id, li) -> {
-            LedgerInfo info = new LedgerInfo();
-            info.ledgerId = li.getLedgerId();
-            info.entries = li.getEntries();
-            info.size = li.getSize();
-            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-            stats.ledgers.add(info);
-            if (futures != null) {
-                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                    if (ex == null) {
-                        info.metadata = lMetadata;
+        CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync();
+        availableBookiesFuture.whenComplete((bookies, e) -> {
+            if (e != null) {
+                log.error("[{}] Failed to fetch available bookies.", topic, e);
+                statFuture.completeExceptionally(e);
+            } else {
+                ml.getLedgersInfo().forEach((id, li) -> {
+                    LedgerInfo info = new LedgerInfo();
+                    info.ledgerId = li.getLedgerId();
+                    info.entries = li.getEntries();
+                    info.size = li.getSize();
+                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                    stats.ledgers.add(info);
+                    if (futures != null) {
+                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                            if (ex == null) {
+                                info.metadata = lMetadata;
+                            }
+                            return null;
+                        }));
+                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                            if (ex == null) {
+                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                        .collect(Collectors.toList()));
+                            }
+                            return null;
+                        }));
                     }
-                    return null;
-                }));
+                });
             }
         });
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d72cbf1..4ffdcad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -260,7 +260,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
                 .getSchemaLedgerList(TopicName.get(topic).getSchemaName());
         assertEquals(ledgers.size(), 2);
         admin.topics().delete(topic, true, true);
-        assertEquals(this.pulsar.getSchemaRegistryService()
+        assertEquals(this.pulsar.getSchemaRegistryService()PulsarService.java
                 .trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0);
 
         for (Long ledger : ledgers) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 0bdb2fe..6f3c261 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -18,6 +18,14 @@
  */
 package org.apache.pulsar.stats.client;
 
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.ServerErrorException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -39,16 +47,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.ClientErrorException;
-import javax.ws.rs.ServerErrorException;
-import java.net.URL;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
 public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
 
     @BeforeMethod
@@ -124,6 +122,10 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
         PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get();
         assertNotNull(internalStats.ledgers.get(0).metadata);
+        // For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"]
+        // The registed bookie ID is 192.168.1.1:5000
+        assertTrue(internalStats.ledgers.get(0).underReplicated);
+
         CursorStats cursor = internalStats.cursors.get(subscriptionName);
         assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
         assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index 4bd61c0..f1306be 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -57,6 +57,7 @@ public class PersistentTopicInternalStats {
         public long size;
         public boolean offloaded;
         public String metadata;
+        public boolean underReplicated;
     }
 
     /**
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 3cc381f..6b05f41 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -66,6 +66,10 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
         return children;
     }
 
+    public CompletableFuture<Set<String>> getAsync() {
+        return getAsync(this.path);
+    }
+
     public CompletableFuture<Set<String>> getAsync(String path) {
         return cache.getChildrenAsync(path, this);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
new file mode 100644
index 0000000..3400b69
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.admin;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for Pulsar Admin.
+ */
+@Slf4j
+public class AdminTest extends MessagingBase {
+
+    @Test(dataProvider = "ServiceAndAdminUrls")
+    public void testUnderReplicatedState(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
+
+        String topicName = getNonPartitionedTopic("replicated-state", true);
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(adminUrl.get())
+                .build();
+
+        @Cleanup
+        final PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl.get())
+                .build();
+
+        @Cleanup
+        final Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+            assertNotNull(messageId);
+        }
+
+        log.info("Successfully to publish 10 messages to {}", topicName);
+        PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+        Assert.assertTrue(stats.ledgers.size() > 0);
+        for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) {
+            Assert.assertFalse(ledger.underReplicated);
+        }
+    }
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index 9045413..feb1cce 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -26,6 +26,7 @@
             <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
+            <class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
         </classes>
     </test>
 </suite>