You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/29 09:08:49 UTC
[pulsar] 02/02: Add underReplicate state in the topic internal
stats (#10013)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit af6c2fa5659185193fa4af413aa5cea0007b81cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Apr 22 01:04:52 2021 +0800
Add underReplicate state in the topic internal stats (#10013)
* Add underReplicate state in the topic internal stats
* Apply comments.
(cherry picked from commit bfba8c8597cfcf0013a2ae1a6490dfd48f15fa76)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++
.../org/apache/pulsar/broker/PulsarService.java | 7 +-
.../broker/cache/LocalZooKeeperCacheService.java | 10 ++-
.../broker/service/persistent/PersistentTopic.java | 43 +++++++++----
.../java/org/apache/pulsar/schema/SchemaTest.java | 2 +-
.../stats/client/PulsarBrokerStatsClientTest.java | 22 ++++---
.../data/PersistentTopicInternalStats.java | 1 +
.../pulsar/zookeeper/ZooKeeperChildrenCache.java | 4 ++
.../pulsar/tests/integration/admin/AdminTest.java | 75 ++++++++++++++++++++++
.../src/test/resources/pulsar-messaging.xml | 1 +
10 files changed, 154 insertions(+), 26 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 66d4c71..c0a7dd4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -39,6 +39,7 @@ import io.netty.util.Recycler.Handle;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,6 +48,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -119,6 +121,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -3532,4 +3535,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
+ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
+ LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+ if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) {
+ return CompletableFuture.completedFuture(Collections.emptySet());
+ }
+
+ return getLedgerHandle(ledgerId).thenCompose(lh -> {
+ Set<BookieId> ensembles = new HashSet<>();
+ lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll);
+ return CompletableFuture.completedFuture(ensembles);
+ });
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ef8df6f..5c79909 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -43,8 +43,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -1366,4 +1367,8 @@ public class PulsarService implements AutoCloseable {
public Optional<Integer> getBrokerListenPortTls() {
return brokerService.getListenPortTls();
}
+
+ public CompletableFuture<Set<String>> getAvailableBookiesAsync() {
+ return this.localZkCacheService.availableBookiesCache().getAsync();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 48b3f88..cf21f02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -34,8 +34,9 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
+import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -50,6 +51,7 @@ public class LocalZooKeeperCacheService {
private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers";
public static final String OWNER_INFO_ROOT = "/namespace";
public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
+ public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available";
private final ZooKeeperCache cache;
@@ -57,6 +59,7 @@ public class LocalZooKeeperCacheService {
private ZooKeeperManagedLedgerCache managedLedgerListCache;
private ResourceQuotaCache resourceQuotaCache;
private ZooKeeperDataCache<LocalPolicies> policiesCache;
+ private ZooKeeperChildrenCache availableBookiesCache;
private ConfigurationCacheService configurationCacheService;
@@ -121,6 +124,7 @@ public class LocalZooKeeperCacheService {
this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
this.resourceQuotaCache = new ResourceQuotaCache(cache);
this.resourceQuotaCache.initZK();
+ this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT);
}
private void initZK() throws PulsarServerException {
@@ -248,6 +252,10 @@ public class LocalZooKeeperCacheService {
return this.managedLedgerListCache;
}
+ public ZooKeeperChildrenCache availableBookiesCache() {
+ return this.availableBookiesCache;
+ }
+
public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 85ddafb..d301443 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -65,6 +66,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -1673,20 +1675,35 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.ledgers = Lists.newArrayList();
List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
- ml.getLedgersInfo().forEach((id, li) -> {
- LedgerInfo info = new LedgerInfo();
- info.ledgerId = li.getLedgerId();
- info.entries = li.getEntries();
- info.size = li.getSize();
- info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
- stats.ledgers.add(info);
- if (futures != null) {
- futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
- if (ex == null) {
- info.metadata = lMetadata;
+ CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync();
+ availableBookiesFuture.whenComplete((bookies, e) -> {
+ if (e != null) {
+ log.error("[{}] Failed to fetch available bookies.", topic, e);
+ statFuture.completeExceptionally(e);
+ } else {
+ ml.getLedgersInfo().forEach((id, li) -> {
+ LedgerInfo info = new LedgerInfo();
+ info.ledgerId = li.getLedgerId();
+ info.entries = li.getEntries();
+ info.size = li.getSize();
+ info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+ stats.ledgers.add(info);
+ if (futures != null) {
+ futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+ if (ex == null) {
+ info.metadata = lMetadata;
+ }
+ return null;
+ }));
+ futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+ if (ex == null) {
+ info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+ .collect(Collectors.toList()));
+ }
+ return null;
+ }));
}
- return null;
- }));
+ });
}
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d72cbf1..4ffdcad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -260,7 +260,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
.getSchemaLedgerList(TopicName.get(topic).getSchemaName());
assertEquals(ledgers.size(), 2);
admin.topics().delete(topic, true, true);
- assertEquals(this.pulsar.getSchemaRegistryService()
+ assertEquals(this.pulsar.getSchemaRegistryService()PulsarService.java
.trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0);
for (Long ledger : ledgers) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 0bdb2fe..6f3c261 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -18,6 +18,14 @@
*/
package org.apache.pulsar.stats.client;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.ServerErrorException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -39,16 +47,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.ws.rs.ClientErrorException;
-import javax.ws.rs.ServerErrorException;
-import java.net.URL;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
@BeforeMethod
@@ -124,6 +122,10 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get();
assertNotNull(internalStats.ledgers.get(0).metadata);
+ // For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"]
+ // The registed bookie ID is 192.168.1.1:5000
+ assertTrue(internalStats.ledgers.get(0).underReplicated);
+
CursorStats cursor = internalStats.cursors.get(subscriptionName);
assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index 4bd61c0..f1306be 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -57,6 +57,7 @@ public class PersistentTopicInternalStats {
public long size;
public boolean offloaded;
public String metadata;
+ public boolean underReplicated;
}
/**
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 3cc381f..6b05f41 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -66,6 +66,10 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
return children;
}
+ public CompletableFuture<Set<String>> getAsync() {
+ return getAsync(this.path);
+ }
+
public CompletableFuture<Set<String>> getAsync(String path) {
return cache.getChildrenAsync(path, this);
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
new file mode 100644
index 0000000..3400b69
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.admin;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for Pulsar Admin.
+ */
+@Slf4j
+public class AdminTest extends MessagingBase {
+
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testUnderReplicatedState(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
+
+ String topicName = getNonPartitionedTopic("replicated-state", true);
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl(adminUrl.get())
+ .build();
+
+ @Cleanup
+ final PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl.get())
+ .build();
+
+ @Cleanup
+ final Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertNotNull(messageId);
+ }
+
+ log.info("Successfully to publish 10 messages to {}", topicName);
+ PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+ Assert.assertTrue(stats.ledgers.size() > 0);
+ for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) {
+ Assert.assertFalse(ledger.underReplicated);
+ }
+ }
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index 9045413..feb1cce 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -26,6 +26,7 @@
<class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
<class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
+ <class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
</classes>
</test>
</suite>