You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/01 03:12:28 UTC
[pulsar] 01/10: [Broker] Fix ``Future.join()`` causing deadlock. (#14469)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6f0f9a9b1adb697e7ef9f32700db032ba4b2ce53
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Tue Mar 1 10:41:07 2022 +0800
[Broker] Fix ``Future.join()`` causing deadlock. (#14469)
Master issue #14438
### Motivation
Invoking the ``join()`` method in the async method will cause some deadlock.
### Modifications
- Refactor ``PersistentTopic#tryToDeletePartitionedMetadata`` to pure async.
(cherry picked from commit 65318e83f8d5b4207a9398e100390800425d5433)
---
.../broker/service/persistent/PersistentTopic.java | 78 ++++++++++++----------
.../broker/service/PersistentTopicE2ETest.java | 22 +++++-
2 files changed, 63 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a090048..235ea52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2266,42 +2266,48 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return CompletableFuture.completedFuture(null);
}
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
- try {
- PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
- .getNamespaceResources()
- .getPartitionedTopicResources();
- if (topicName.isPartitioned() && !partitionedTopicResources.partitionedTopicExists(topicName)) {
- return CompletableFuture.completedFuture(null);
- }
- CompletableFuture<Void> deleteMetadataFuture = new CompletableFuture<>();
- getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
- .thenAccept((metadata -> {
- // make sure all sub partitions were deleted
- for (int i = 0; i < metadata.partitions; i++) {
- if (brokerService.getPulsar().getPulsarResources().getTopicResources()
- .persistentTopicExists(topicName.getPartition(i)).join()) {
- throw new UnsupportedOperationException();
- }
- }
- }))
- .thenAccept((res) -> partitionedTopicResources.deletePartitionedTopicAsync(topicName)
- .thenAccept((r) -> {
- deleteMetadataFuture.complete(null);
- }).exceptionally(ex -> {
- deleteMetadataFuture.completeExceptionally(ex.getCause());
- return null;
- }))
- .exceptionally((e) -> {
- if (!(e.getCause() instanceof UnsupportedOperationException)) {
- log.error("delete metadata fail", e);
- }
- deleteMetadataFuture.complete(null);
- return null;
- });
- return deleteMetadataFuture;
- } catch (Exception e) {
- return FutureUtil.failedFuture(e);
- }
+ PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources();
+ return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
+ .thenCompose(partitionedTopicExist -> {
+ if (!partitionedTopicExist) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getBrokerService()
+ .fetchPartitionedTopicMetadataAsync(topicName)
+ .thenCompose((metadata -> {
+ List<CompletableFuture<Boolean>> persistentTopicExists =
+ new ArrayList<>(metadata.partitions);
+ for (int i = 0; i < metadata.partitions; i++) {
+ persistentTopicExists.add(brokerService.getPulsar()
+ .getPulsarResources().getTopicResources()
+ .persistentTopicExists(topicName.getPartition(i)));
+ }
+ List<CompletableFuture<Boolean>> unmodifiablePersistentTopicExists =
+ Collections.unmodifiableList(persistentTopicExists);
+ return FutureUtil.waitForAll(unmodifiablePersistentTopicExists)
+ .thenCompose(unused -> {
+ // make sure all sub partitions were deleted after all future complete
+ Optional<Boolean> anyExistPartition = unmodifiablePersistentTopicExists
+ .stream()
+ .map(CompletableFuture::join)
+ .filter(topicExist -> topicExist)
+ .findAny();
+ if (anyExistPartition.isPresent()) {
+ log.error("[{}] Delete topic metadata failed because"
+ + " another partition exist.", topicName);
+ throw new UnsupportedOperationException(
+ String.format("Another partition exists for [%s].",
+ topicName));
+ } else {
+ return partitionedTopicResources
+ .deletePartitionedTopicAsync(topicName);
+ }
+ });
+ }));
+ }
+ });
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index ac5230d..9db6a0c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -97,6 +97,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
super.baseSetup();
}
@@ -618,8 +619,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
- }
+ // write again, the topic will be available
+ Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
+ producer2.close();
+
+ assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // 6. Test for partitioned topic to delete the partitioned metadata
+ String topicGc = "persistent://prop/ns-abc/topic-gc";
+ int partitions = 5;
+ admin.topics().createPartitionedTopic(topicGc, partitions);
+ Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
+ producer3.close();
+ assertEquals(partitions, pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ TopicName.get(topicGc)).join().partitions);
+ runGC();
+ Awaitility.await().untilAsserted(()-> {
+ assertEquals(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ TopicName.get(topicGc)).join().partitions, 0);
+ });
+ }
@Data
@ToString
@EqualsAndHashCode