You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/04/21 17:05:43 UTC

[pulsar] branch master updated: Add underReplicate state in the topic internal stats (#10013)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bfba8c8  Add underReplicate state in the topic internal stats (#10013)
bfba8c8 is described below

commit bfba8c8597cfcf0013a2ae1a6490dfd48f15fa76
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Apr 22 01:04:52 2021 +0800

    Add underReplicate state in the topic internal stats (#10013)
    
    * Add underReplicate state in the topic internal stats
    
    * Apply comments.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++
 .../org/apache/pulsar/broker/PulsarService.java    |  6 ++
 .../broker/cache/LocalZooKeeperCacheService.java   |  8 +++
 .../broker/service/persistent/PersistentTopic.java | 43 +++++++++----
 .../stats/client/PulsarBrokerStatsClientTest.java  |  5 ++
 .../data/PersistentTopicInternalStats.java         |  1 +
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |  4 ++
 .../pulsar/tests/integration/admin/AdminTest.java  | 75 ++++++++++++++++++++++
 .../src/test/resources/pulsar-messaging.xml        |  1 +
 9 files changed, 145 insertions(+), 13 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3e03527..d43b05f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -39,6 +39,7 @@ import io.netty.util.ReferenceCountUtil;
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -122,6 +124,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
 import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -3735,4 +3738,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
+    public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
+        LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+        if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) {
+            return CompletableFuture.completedFuture(Collections.emptySet());
+        }
+
+        return getLedgerHandle(ledgerId).thenCompose(lh -> {
+            Set<BookieId> ensembles = new HashSet<>();
+            lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll);
+            return CompletableFuture.completedFuture(ensembles);
+        });
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 86630bb..b3c0aec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -1503,6 +1504,10 @@ public class PulsarService implements AutoCloseable {
         return coordinationService;
     }
 
+    public CompletableFuture<Set<String>> getAvailableBookiesAsync() {
+        return this.localZkCacheService.availableBookiesCache().getAsync();
+    }
+
     public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig,
                                                                       String workerConfigFile) throws IOException {
         WorkerConfig workerConfig = WorkerConfig.load(workerConfigFile);
@@ -1550,4 +1555,5 @@ public class PulsarService implements AutoCloseable {
                         ? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort()));
         return workerConfig;
     }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 07cc764..e1970de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.zookeeper.CreateMode;
@@ -48,6 +49,7 @@ public class LocalZooKeeperCacheService {
     private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers";
     public static final String OWNER_INFO_ROOT = "/namespace";
     public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
+    public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available";
 
     private final ZooKeeperCache cache;
 
@@ -55,6 +57,7 @@ public class LocalZooKeeperCacheService {
     private ZooKeeperManagedLedgerCache managedLedgerListCache;
     private ResourceQuotaCache resourceQuotaCache;
     private ZooKeeperDataCache<LocalPolicies> policiesCache;
+    private ZooKeeperChildrenCache availableBookiesCache;
 
     private ConfigurationCacheService configurationCacheService;
 
@@ -119,6 +122,7 @@ public class LocalZooKeeperCacheService {
         this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
         this.resourceQuotaCache = new ResourceQuotaCache(cache);
         this.resourceQuotaCache.initZK();
+        this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT);
     }
 
     private void initZK() throws PulsarServerException {
@@ -245,6 +249,10 @@ public class LocalZooKeeperCacheService {
         return this.managedLedgerListCache;
     }
 
+    public ZooKeeperChildrenCache availableBookiesCache() {
+        return this.availableBookiesCache;
+    }
+
     public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
         return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index deccca4..ab4483a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -65,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -1775,20 +1777,35 @@ public class PersistentTopic extends AbstractTopic
 
         stats.ledgers = Lists.newArrayList();
         List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
-        ml.getLedgersInfo().forEach((id, li) -> {
-            LedgerInfo info = new LedgerInfo();
-            info.ledgerId = li.getLedgerId();
-            info.entries = li.getEntries();
-            info.size = li.getSize();
-            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-            stats.ledgers.add(info);
-            if (futures != null) {
-                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                    if (ex == null) {
-                        info.metadata = lMetadata;
+        CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync();
+        availableBookiesFuture.whenComplete((bookies, e) -> {
+            if (e != null) {
+                log.error("[{}] Failed to fetch available bookies.", topic, e);
+                statFuture.completeExceptionally(e);
+            } else {
+                ml.getLedgersInfo().forEach((id, li) -> {
+                    LedgerInfo info = new LedgerInfo();
+                    info.ledgerId = li.getLedgerId();
+                    info.entries = li.getEntries();
+                    info.size = li.getSize();
+                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                    stats.ledgers.add(info);
+                    if (futures != null) {
+                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                            if (ex == null) {
+                                info.metadata = lMetadata;
+                            }
+                            return null;
+                        }));
+                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                            if (ex == null) {
+                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                        .collect(Collectors.toList()));
+                            }
+                            return null;
+                        }));
                     }
-                    return null;
-                }));
+                });
             }
         });
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 938f721..576cbdf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -27,6 +27,7 @@ import java.net.URL;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.ServerErrorException;
+
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -124,6 +125,10 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
         PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get();
         assertNotNull(internalStats.ledgers.get(0).metadata);
+        // For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"]
+        // The registed bookie ID is 192.168.1.1:5000
+        assertTrue(internalStats.ledgers.get(0).underReplicated);
+
         CursorStats cursor = internalStats.cursors.get(subscriptionName);
         assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
         assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index c9b5ae5..47b777c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -57,6 +57,7 @@ public class PersistentTopicInternalStats {
         public long size;
         public boolean offloaded;
         public String metadata;
+        public boolean underReplicated;
     }
 
     /**
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 3cc381f..6b05f41 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -66,6 +66,10 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
         return children;
     }
 
+    public CompletableFuture<Set<String>> getAsync() {
+        return getAsync(this.path);
+    }
+
     public CompletableFuture<Set<String>> getAsync(String path) {
         return cache.getChildrenAsync(path, this);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
new file mode 100644
index 0000000..3400b69
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.admin;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for Pulsar Admin.
+ */
+@Slf4j
+public class AdminTest extends MessagingBase {
+
+    @Test(dataProvider = "ServiceAndAdminUrls")
+    public void testUnderReplicatedState(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
+
+        String topicName = getNonPartitionedTopic("replicated-state", true);
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(adminUrl.get())
+                .build();
+
+        @Cleanup
+        final PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl.get())
+                .build();
+
+        @Cleanup
+        final Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+            assertNotNull(messageId);
+        }
+
+        log.info("Successfully to publish 10 messages to {}", topicName);
+        PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+        Assert.assertTrue(stats.ledgers.size() > 0);
+        for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) {
+            Assert.assertFalse(ledger.underReplicated);
+        }
+    }
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index 766d790..e0ac367 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -26,6 +26,7 @@
             <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
+            <class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
         </classes>
     </test>
 </suite>
\ No newline at end of file