You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/29 09:08:49 UTC

[pulsar] 02/02: Add underReplicate state in the topic internal stats (#10013)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af6c2fa5659185193fa4af413aa5cea0007b81cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Apr 22 01:04:52 2021 +0800

    Add underReplicate state in the topic internal stats (#10013)
    
    * Add underReplicate state in the topic internal stats
    
    * Apply comments.
    
    (cherry picked from commit bfba8c8597cfcf0013a2ae1a6490dfd48f15fa76)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++
 .../org/apache/pulsar/broker/PulsarService.java    |  7 +-
 .../broker/cache/LocalZooKeeperCacheService.java   | 10 ++-
 .../broker/service/persistent/PersistentTopic.java | 43 +++++++++----
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  2 +-
 .../stats/client/PulsarBrokerStatsClientTest.java  | 22 ++++---
 .../data/PersistentTopicInternalStats.java         |  1 +
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |  4 ++
 .../pulsar/tests/integration/admin/AdminTest.java  | 75 ++++++++++++++++++++++
 .../src/test/resources/pulsar-messaging.xml        |  1 +
 10 files changed, 154 insertions(+), 26 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 66d4c71..c0a7dd4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -39,6 +39,7 @@ import io.netty.util.Recycler.Handle;
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -119,6 +121,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
 import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -3532,4 +3535,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
+        LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+        if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) {
+            return CompletableFuture.completedFuture(Collections.emptySet());
+        }
+
+        return getLedgerHandle(ledgerId).thenCompose(lh -> {
+            Set<BookieId> ensembles = new HashSet<>();
+            lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll);
+            return CompletableFuture.completedFuture(ensembles);
+        });
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ef8df6f..5c79909 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -43,8 +43,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -1366,4 +1367,8 @@ public class PulsarService implements AutoCloseable {
     public Optional<Integer> getBrokerListenPortTls() {
         return brokerService.getListenPortTls();
     }
+
+    public CompletableFuture<Set<String>> getAvailableBookiesAsync() {
+        return this.localZkCacheService.availableBookiesCache().getAsync();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 48b3f88..cf21f02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -34,8 +34,9 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
+import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -50,6 +51,7 @@ public class LocalZooKeeperCacheService {
     private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers";
     public static final String OWNER_INFO_ROOT = "/namespace";
     public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
+    public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available";
 
     private final ZooKeeperCache cache;
 
@@ -57,6 +59,7 @@ public class LocalZooKeeperCacheService {
     private ZooKeeperManagedLedgerCache managedLedgerListCache;
     private ResourceQuotaCache resourceQuotaCache;
     private ZooKeeperDataCache<LocalPolicies> policiesCache;
+    private ZooKeeperChildrenCache availableBookiesCache;
 
     private ConfigurationCacheService configurationCacheService;
 
@@ -121,6 +124,7 @@ public class LocalZooKeeperCacheService {
         this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
         this.resourceQuotaCache = new ResourceQuotaCache(cache);
         this.resourceQuotaCache.initZK();
+        this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT);
     }
 
     private void initZK() throws PulsarServerException {
@@ -248,6 +252,10 @@ public class LocalZooKeeperCacheService {
         return this.managedLedgerListCache;
     }
 
+    public ZooKeeperChildrenCache availableBookiesCache() {
+        return this.availableBookiesCache;
+    }
+
     public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
         return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 85ddafb..d301443 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -65,6 +66,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -1673,20 +1675,35 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         stats.ledgers = Lists.newArrayList();
         List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
-        ml.getLedgersInfo().forEach((id, li) -> {
-            LedgerInfo info = new LedgerInfo();
-            info.ledgerId = li.getLedgerId();
-            info.entries = li.getEntries();
-            info.size = li.getSize();
-            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-            stats.ledgers.add(info);
-            if (futures != null) {
-                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                    if (ex == null) {
-                        info.metadata = lMetadata;
+        CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync();
+        availableBookiesFuture.whenComplete((bookies, e) -> {
+            if (e != null) {
+                log.error("[{}] Failed to fetch available bookies.", topic, e);
+                statFuture.completeExceptionally(e);
+            } else {
+                ml.getLedgersInfo().forEach((id, li) -> {
+                    LedgerInfo info = new LedgerInfo();
+                    info.ledgerId = li.getLedgerId();
+                    info.entries = li.getEntries();
+                    info.size = li.getSize();
+                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                    stats.ledgers.add(info);
+                    if (futures != null) {
+                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                            if (ex == null) {
+                                info.metadata = lMetadata;
+                            }
+                            return null;
+                        }));
+                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                            if (ex == null) {
+                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                        .collect(Collectors.toList()));
+                            }
+                            return null;
+                        }));
                     }
-                    return null;
-                }));
+                });
             }
         });
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d72cbf1..4ffdcad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -260,7 +260,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
                 .getSchemaLedgerList(TopicName.get(topic).getSchemaName());
         assertEquals(ledgers.size(), 2);
         admin.topics().delete(topic, true, true);
-        assertEquals(this.pulsar.getSchemaRegistryService()
+        assertEquals(this.pulsar.getSchemaRegistryService()PulsarService.java
                 .trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0);
 
         for (Long ledger : ledgers) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 0bdb2fe..6f3c261 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -18,6 +18,14 @@
  */
 package org.apache.pulsar.stats.client;
 
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.ServerErrorException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -39,16 +47,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.ClientErrorException;
-import javax.ws.rs.ServerErrorException;
-import java.net.URL;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
 public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
 
     @BeforeMethod
@@ -124,6 +122,10 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
         PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get();
         assertNotNull(internalStats.ledgers.get(0).metadata);
+        // For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"]
+        // The registed bookie ID is 192.168.1.1:5000
+        assertTrue(internalStats.ledgers.get(0).underReplicated);
+
         CursorStats cursor = internalStats.cursors.get(subscriptionName);
         assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
         assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index 4bd61c0..f1306be 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -57,6 +57,7 @@ public class PersistentTopicInternalStats {
         public long size;
         public boolean offloaded;
         public String metadata;
+        public boolean underReplicated;
     }
 
     /**
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 3cc381f..6b05f41 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -66,6 +66,10 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
         return children;
     }
 
+    public CompletableFuture<Set<String>> getAsync() {
+        return getAsync(this.path);
+    }
+
     public CompletableFuture<Set<String>> getAsync(String path) {
         return cache.getChildrenAsync(path, this);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
new file mode 100644
index 0000000..3400b69
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.admin;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for Pulsar Admin.
+ */
+@Slf4j
+public class AdminTest extends MessagingBase {
+
+    @Test(dataProvider = "ServiceAndAdminUrls")
+    public void testUnderReplicatedState(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
+
+        String topicName = getNonPartitionedTopic("replicated-state", true);
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(adminUrl.get())
+                .build();
+
+        @Cleanup
+        final PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl.get())
+                .build();
+
+        @Cleanup
+        final Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+            assertNotNull(messageId);
+        }
+
+        log.info("Successfully to publish 10 messages to {}", topicName);
+        PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+        Assert.assertTrue(stats.ledgers.size() > 0);
+        for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) {
+            Assert.assertFalse(ledger.underReplicated);
+        }
+    }
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index 9045413..feb1cce 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -26,6 +26,7 @@
             <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
+            <class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
         </classes>
     </test>
 </suite>