You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/01 03:12:36 UTC

[pulsar] 09/10: [Broker] Fix error when "checkGC#tryToDeletePartitionedMetadata" is ignored (#14482)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0bbf05b82e7ab8d2dce28f5ef2116a335ee81e84
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Mon Feb 28 16:36:07 2022 +0800

    [Broker] Fix error when "checkGC#tryToDeletePartitionedMetadata" is ignored (#14482)
    
    (cherry picked from commit a9fe9efe549af40ff0a90b970064f17af72a786b)
---
 .../service/nonpersistent/NonPersistentTopic.java  | 22 +++++++++++-----------
 .../broker/service/NonPersistentTopicE2ETest.java  | 12 +++++++-----
 2 files changed, 18 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 6c14edf..71f1764 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -916,7 +916,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                     }
 
                     stopReplProducers().thenCompose(v -> delete(true, false, true))
-                            .thenAccept(__ -> tryToDeletePartitionedMetadata())
+                            .thenCompose(__ -> tryToDeletePartitionedMetadata())
                             .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
                             .exceptionally(e -> {
                                 Throwable throwable = e.getCause();
@@ -943,16 +943,16 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
             return CompletableFuture.completedFuture(null);
         }
         TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
-        try {
-            NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
-                    .getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
-            if (!partitionedTopicResources.partitionedTopicExists(topicName)) {
-                return CompletableFuture.completedFuture(null);
-            }
-            return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
+        NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
+                .getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
+        return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
+                .thenCompose(partitionedTopicExist -> {
+                    if (!partitionedTopicExist) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
+                    }
+                });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 83fad2b..0e598f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -37,11 +37,12 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -235,10 +236,11 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         admin.topics().createPartitionedTopic(topicGc, partitions);
         Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
         producer3.close();
-        assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
-                TopicName.get(topicGc)).join().partitions == partitions);
+        assertEquals(partitions, pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions);
         runGC();
-        assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
-                TopicName.get(topicGc)).join().partitions == 0);
+        Awaitility.await().untilAsserted(()->
+                assertEquals(pulsar.getBrokerService().
+                        fetchPartitionedTopicMetadataAsync(TopicName.get(topicGc)).join().partitions, 0));
     }
 }