You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/04/21 17:05:43 UTC
[pulsar] branch master updated: Add underReplicate state in the
topic internal stats (#10013)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bfba8c8 Add underReplicate state in the topic internal stats (#10013)
bfba8c8 is described below
commit bfba8c8597cfcf0013a2ae1a6490dfd48f15fa76
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Apr 22 01:04:52 2021 +0800
Add underReplicate state in the topic internal stats (#10013)
* Add underReplicate state in the topic internal stats
* Apply comments.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++
.../org/apache/pulsar/broker/PulsarService.java | 6 ++
.../broker/cache/LocalZooKeeperCacheService.java | 8 +++
.../broker/service/persistent/PersistentTopic.java | 43 +++++++++----
.../stats/client/PulsarBrokerStatsClientTest.java | 5 ++
.../data/PersistentTopicInternalStats.java | 1 +
.../pulsar/zookeeper/ZooKeeperChildrenCache.java | 4 ++
.../pulsar/tests/integration/admin/AdminTest.java | 75 ++++++++++++++++++++++
.../src/test/resources/pulsar-messaging.xml | 1 +
9 files changed, 145 insertions(+), 13 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3e03527..d43b05f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -39,6 +39,7 @@ import io.netty.util.ReferenceCountUtil;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,6 +48,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -122,6 +124,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -3735,4 +3738,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
+ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
+ LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+ if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) {
+ return CompletableFuture.completedFuture(Collections.emptySet());
+ }
+
+ return getLedgerHandle(ledgerId).thenCompose(lh -> {
+ Set<BookieId> ensembles = new HashSet<>();
+ lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll);
+ return CompletableFuture.completedFuture(ensembles);
+ });
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 86630bb..b3c0aec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -1503,6 +1504,10 @@ public class PulsarService implements AutoCloseable {
return coordinationService;
}
+ public CompletableFuture<Set<String>> getAvailableBookiesAsync() {
+ return this.localZkCacheService.availableBookiesCache().getAsync();
+ }
+
public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig,
String workerConfigFile) throws IOException {
WorkerConfig workerConfig = WorkerConfig.load(workerConfigFile);
@@ -1550,4 +1555,5 @@ public class PulsarService implements AutoCloseable {
? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort()));
return workerConfig;
}
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 07cc764..e1970de 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.CreateMode;
@@ -48,6 +49,7 @@ public class LocalZooKeeperCacheService {
private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers";
public static final String OWNER_INFO_ROOT = "/namespace";
public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
+ public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available";
private final ZooKeeperCache cache;
@@ -55,6 +57,7 @@ public class LocalZooKeeperCacheService {
private ZooKeeperManagedLedgerCache managedLedgerListCache;
private ResourceQuotaCache resourceQuotaCache;
private ZooKeeperDataCache<LocalPolicies> policiesCache;
+ private ZooKeeperChildrenCache availableBookiesCache;
private ConfigurationCacheService configurationCacheService;
@@ -119,6 +122,7 @@ public class LocalZooKeeperCacheService {
this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
this.resourceQuotaCache = new ResourceQuotaCache(cache);
this.resourceQuotaCache.initZK();
+ this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT);
}
private void initZK() throws PulsarServerException {
@@ -245,6 +249,10 @@ public class LocalZooKeeperCacheService {
return this.managedLedgerListCache;
}
+ public ZooKeeperChildrenCache availableBookiesCache() {
+ return this.availableBookiesCache;
+ }
+
public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index deccca4..ab4483a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -65,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -1775,20 +1777,35 @@ public class PersistentTopic extends AbstractTopic
stats.ledgers = Lists.newArrayList();
List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
- ml.getLedgersInfo().forEach((id, li) -> {
- LedgerInfo info = new LedgerInfo();
- info.ledgerId = li.getLedgerId();
- info.entries = li.getEntries();
- info.size = li.getSize();
- info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
- stats.ledgers.add(info);
- if (futures != null) {
- futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
- if (ex == null) {
- info.metadata = lMetadata;
+ CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync();
+ availableBookiesFuture.whenComplete((bookies, e) -> {
+ if (e != null) {
+ log.error("[{}] Failed to fetch available bookies.", topic, e);
+ statFuture.completeExceptionally(e);
+ } else {
+ ml.getLedgersInfo().forEach((id, li) -> {
+ LedgerInfo info = new LedgerInfo();
+ info.ledgerId = li.getLedgerId();
+ info.entries = li.getEntries();
+ info.size = li.getSize();
+ info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+ stats.ledgers.add(info);
+ if (futures != null) {
+ futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+ if (ex == null) {
+ info.metadata = lMetadata;
+ }
+ return null;
+ }));
+ futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+ if (ex == null) {
+ info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+ .collect(Collectors.toList()));
+ }
+ return null;
+ }));
}
- return null;
- }));
+ });
}
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 938f721..576cbdf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -27,6 +27,7 @@ import java.net.URL;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.ServerErrorException;
+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -124,6 +125,10 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get();
assertNotNull(internalStats.ledgers.get(0).metadata);
+ // For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"]
+ // The registed bookie ID is 192.168.1.1:5000
+ assertTrue(internalStats.ledgers.get(0).underReplicated);
+
CursorStats cursor = internalStats.cursors.get(subscriptionName);
assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index c9b5ae5..47b777c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -57,6 +57,7 @@ public class PersistentTopicInternalStats {
public long size;
public boolean offloaded;
public String metadata;
+ public boolean underReplicated;
}
/**
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 3cc381f..6b05f41 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -66,6 +66,10 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
return children;
}
+ public CompletableFuture<Set<String>> getAsync() {
+ return getAsync(this.path);
+ }
+
public CompletableFuture<Set<String>> getAsync(String path) {
return cache.getChildrenAsync(path, this);
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
new file mode 100644
index 0000000..3400b69
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.admin;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for Pulsar Admin.
+ */
+@Slf4j
+public class AdminTest extends MessagingBase {
+
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testUnderReplicatedState(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
+
+ String topicName = getNonPartitionedTopic("replicated-state", true);
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl(adminUrl.get())
+ .build();
+
+ @Cleanup
+ final PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl.get())
+ .build();
+
+ @Cleanup
+ final Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+
+ log.info("Successfully to publish 10 messages to {}", topicName);
+ PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+ Assert.assertTrue(stats.ledgers.size() > 0);
+ for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) {
+ Assert.assertFalse(ledger.underReplicated);
+ }
+ }
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index 766d790..e0ac367 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -26,6 +26,7 @@
<class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
<class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
+ <class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
</classes>
</test>
</suite>
\ No newline at end of file