You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/01 16:24:30 UTC

[pulsar] 21/21: [Broker] Fix ``Future.join()`` causing deadlock. (#14469)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d87a2304fc55828aa1a7a905c2631f594b49b764
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Tue Mar 1 10:41:07 2022 +0800

    [Broker] Fix ``Future.join()`` causing deadlock. (#14469)
    
    Master issue #14438
    
    ### Motivation
    
    Invoking the ``join()`` method in the async method will cause some deadlock.
    
    ### Modifications
    
    - Refactor ``PersistentTopic#tryToDeletePartitionedMetadata`` to pure async.
    
    (cherry picked from commit 65318e83f8d5b4207a9398e100390800425d5433)
---
 .../broker/service/persistent/PersistentTopic.java | 78 ++++++++++++----------
 .../broker/service/PersistentTopicE2ETest.java     | 22 +++++-
 2 files changed, 63 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00934ad..e96fdc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2278,42 +2278,48 @@ public class PersistentTopic extends AbstractTopic
             return CompletableFuture.completedFuture(null);
         }
         TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
-        try {
-            PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
-                    .getNamespaceResources()
-                    .getPartitionedTopicResources();
-            if (topicName.isPartitioned() && !partitionedTopicResources.partitionedTopicExists(topicName)) {
-                return CompletableFuture.completedFuture(null);
-            }
-            CompletableFuture<Void> deleteMetadataFuture = new CompletableFuture<>();
-            getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
-                    .thenAccept((metadata -> {
-                        // make sure all sub partitions were deleted
-                        for (int i = 0; i < metadata.partitions; i++) {
-                            if (brokerService.getPulsar().getPulsarResources().getTopicResources()
-                                    .persistentTopicExists(topicName.getPartition(i)).join()) {
-                                throw new UnsupportedOperationException();
-                            }
-                        }
-                    }))
-                    .thenAccept((res) -> partitionedTopicResources.deletePartitionedTopicAsync(topicName)
-                            .thenAccept((r) -> {
-                        deleteMetadataFuture.complete(null);
-                    }).exceptionally(ex -> {
-                        deleteMetadataFuture.completeExceptionally(ex.getCause());
-                        return null;
-                    }))
-                    .exceptionally((e) -> {
-                        if (!(e.getCause() instanceof UnsupportedOperationException)) {
-                            log.error("delete metadata fail", e);
-                        }
-                        deleteMetadataFuture.complete(null);
-                        return null;
-                    });
-            return deleteMetadataFuture;
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
+        PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources();
+        return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
+                .thenCompose(partitionedTopicExist -> {
+                    if (!partitionedTopicExist) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getBrokerService()
+                                .fetchPartitionedTopicMetadataAsync(topicName)
+                                .thenCompose((metadata -> {
+                                    List<CompletableFuture<Boolean>> persistentTopicExists =
+                                            new ArrayList<>(metadata.partitions);
+                                    for (int i = 0; i < metadata.partitions; i++) {
+                                        persistentTopicExists.add(brokerService.getPulsar()
+                                                .getPulsarResources().getTopicResources()
+                                                .persistentTopicExists(topicName.getPartition(i)));
+                                    }
+                                    List<CompletableFuture<Boolean>> unmodifiablePersistentTopicExists =
+                                            Collections.unmodifiableList(persistentTopicExists);
+                                    return FutureUtil.waitForAll(unmodifiablePersistentTopicExists)
+                                            .thenCompose(unused -> {
+                                                // make sure all sub partitions were deleted after all future complete
+                                                Optional<Boolean> anyExistPartition = unmodifiablePersistentTopicExists
+                                                        .stream()
+                                                        .map(CompletableFuture::join)
+                                                        .filter(topicExist -> topicExist)
+                                                        .findAny();
+                                                if (anyExistPartition.isPresent()) {
+                                                    log.error("[{}] Delete topic metadata failed because"
+                                                            + " another partition exist.", topicName);
+                                                    throw new UnsupportedOperationException(
+                                                            String.format("Another partition exists for [%s].",
+                                                                    topicName));
+                                                } else {
+                                                    return partitionedTopicResources
+                                                            .deletePartitionedTopicAsync(topicName);
+                                                }
+                                            });
+                                }));
+                    }
+                });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 42512b0..daf0ed7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -96,6 +96,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
         super.baseSetup();
     }
 
@@ -617,8 +618,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
 
         runGC();
         assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-    }
 
+        // write again, the topic will be available
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
+        producer2.close();
+
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 6. Test for partitioned topic to delete the partitioned metadata
+        String topicGc = "persistent://prop/ns-abc/topic-gc";
+        int partitions = 5;
+        admin.topics().createPartitionedTopic(topicGc, partitions);
+        Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
+        producer3.close();
+        assertEquals(partitions, pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions);
+        runGC();
+        Awaitility.await().untilAsserted(()-> {
+            assertEquals(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                    TopicName.get(topicGc)).join().partitions, 0);
+        });
+    }
     @Data
     @ToString
     @EqualsAndHashCode